Skip to content

Commit

Permalink
fix unstable mpp fail tests (#5709)
Browse files Browse the repository at this point in the history
ref #5095, close #5708
  • Loading branch information
windtalker authored Aug 26, 2022
1 parent aecc1df commit 61e436b
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 3 deletions.
1 change: 0 additions & 1 deletion dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
void readSuffixImpl() override
{
LOG_FMT_DEBUG(log, "finish read {} rows from remote", total_rows);
remote_reader->close();
}

void appendInfo(FmtBuffer & buffer) const override
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Flash/Mpp/MPPReceiverSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@ ExchangeReceiverPtr MPPReceiverSet::getExchangeReceiver(const String & executor_
void MPPReceiverSet::cancel()
{
for (auto & it : exchange_receiver_map)
{
it.second->cancel();
}
for (auto & cop_reader : coprocessor_readers)
cop_reader->cancel();
}

void MPPReceiverSet::close()
{
for (auto & it : exchange_receiver_map)
it.second->close();
for (auto & cop_reader : coprocessor_readers)
cop_reader->close();
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPReceiverSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class MPPReceiverSet
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
ExchangeReceiverPtr getExchangeReceiver(const String & executor_id) const;
void cancel();
void close();

private:
/// two kinds of receiver in MPP
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,11 @@ void MPPTask::runImpl()
while (from->read())
continue;

// finish DataStream
from->readSuffix();
// finish receiver
receiver_set->close();
// finish MPPTunnel
finishWrite();

const auto & return_statistics = mpp_task_statistics.collectRuntimeStatistics();
Expand Down

0 comments on commit 61e436b

Please sign in to comment.