Skip to content

Commit

Permalink
Fix hashJoinProb thread leak issue (#6725)
Browse files Browse the repository at this point in the history
close #6692
  • Loading branch information
yibin87 authored Feb 3, 2023
1 parent 2ccbd2b commit 4206a52
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
20 changes: 19 additions & 1 deletion dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,27 @@ Block HashJoinProbeBlockInputStream::getHeader() const
return join->joinBlock(header_probe_process_info);
}

void HashJoinProbeBlockInputStream::finishOneProbe()
{
bool expect = false;
if likely (probe_finished.compare_exchange_strong(expect, true))
join->finishOneProbe();
}

void HashJoinProbeBlockInputStream::cancel(bool kill)
{
IProfilingBlockInputStream::cancel(kill);
/// When the probe stream quits probe by cancelling instead of normal finish, the Join operator might still produce meaningless blocks
/// and expects these meaningless blocks won't be used to produce meaningful result.
try
{
finishOneProbe();
}
catch (...)
{
tryLogCurrentException(log, "finishOneProbe failed in cancel() ");
join->meetError();
}
if (non_joined_stream != nullptr)
{
auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(non_joined_stream.get());
Expand Down Expand Up @@ -112,7 +130,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock()
Block block = children.back()->read();
if (!block)
{
join->finishOneProbe();
finishOneProbe();
if (join->needReturnNonJoinedData())
status = ProbeStatus::WAIT_FOR_READ_NON_JOINED_DATA;
else
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
FINISHED,
};
void readSuffixImpl() override;
void finishOneProbe();

const LoggerPtr log;
JoinPtr join;
Expand All @@ -69,6 +70,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
ProbeStatus status{ProbeStatus::PROBE};
size_t joined_rows = 0;
size_t non_joined_rows = 0;
std::atomic<bool> probe_finished = false;
};

} // namespace DB

0 comments on commit 4206a52

Please sign in to comment.