diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 2d40d1295c5..9120ee98598 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -69,9 +69,27 @@ Block HashJoinProbeBlockInputStream::getHeader() const return join->joinBlock(header_probe_process_info); } +void HashJoinProbeBlockInputStream::finishOneProbe() +{ + bool expect = false; + if likely (probe_finished.compare_exchange_strong(expect, true)) + join->finishOneProbe(); +} + void HashJoinProbeBlockInputStream::cancel(bool kill) { IProfilingBlockInputStream::cancel(kill); + /// When the probe stream quits probe by cancelling instead of normal finish, the Join operator might still produce meaningless blocks + /// and expects these meaningless blocks won't be used to produce meaningful result. + try + { + finishOneProbe(); + } + catch (...) + { + tryLogCurrentException(log, "finishOneProbe failed in cancel() "); + join->meetError(); + } if (non_joined_stream != nullptr) { auto * p_stream = dynamic_cast(non_joined_stream.get()); @@ -112,7 +130,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() Block block = children.back()->read(); if (!block) { - join->finishOneProbe(); + finishOneProbe(); if (join->needReturnNonJoinedData()) status = ProbeStatus::WAIT_FOR_READ_NON_JOINED_DATA; else diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index fb4213cff54..11918d28e24 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -60,6 +60,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream FINISHED, }; void readSuffixImpl() override; + void finishOneProbe(); const LoggerPtr log; JoinPtr join; @@ -69,6 +70,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream ProbeStatus status{ProbeStatus::PROBE}; size_t joined_rows = 0; size_t non_joined_rows = 0; + std::atomic probe_finished = false; }; } // namespace DB