From 5790aea893c8d4360f92708181324d89b56fbde5 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 23 Mar 2023 18:51:05 +0800 Subject: [PATCH 01/10] step1 --- .../HashJoinProbeBlockInputStream.cpp | 101 +++++++++--------- .../HashJoinProbeBlockInputStream.h | 9 +- dbms/src/DataStreams/HashJoinProbeExec.h | 39 +++++++ 3 files changed, 91 insertions(+), 58 deletions(-) create mode 100644 dbms/src/DataStreams/HashJoinProbeExec.h diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 65143969bd9..b08e877914f 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -26,19 +26,20 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( UInt64 max_block_size_) : log(Logger::get(req_id)) , original_join(join_) - , join(original_join) - , need_output_non_joined_data(join->needReturnNonJoinedData()) + , probe_exec(std::make_shared()) + , need_output_non_joined_data(join_->needReturnNonJoinedData()) , current_non_joined_stream_index(non_joined_stream_index) , max_block_size(max_block_size_) , probe_process_info(max_block_size_) { children.push_back(input); - current_probe_stream = children.back(); + probe_exec->probe_stream = children.back(); - RUNTIME_CHECK_MSG(join != nullptr, "join ptr should not be null."); - RUNTIME_CHECK_MSG(join->getProbeConcurrency() > 0, "Join probe concurrency must be greater than 0"); + probe_exec->join = original_join; + RUNTIME_CHECK_MSG(probe_exec->join != nullptr, "join ptr should not be null."); + RUNTIME_CHECK_MSG(probe_exec->join->getProbeConcurrency() > 0, "Join probe concurrency must be greater than 0"); if (need_output_non_joined_data) - non_joined_stream = join->createStreamWithNonJoinedRows(input->getHeader(), current_non_joined_stream_index, join->getProbeConcurrency(), max_block_size); + probe_exec->non_joined_stream = probe_exec->join->createStreamWithNonJoinedRows(input->getHeader(), current_non_joined_stream_index, probe_exec->join->getProbeConcurrency(), max_block_size); } Block HashJoinProbeBlockInputStream::getHeader() const @@ -61,10 +62,10 @@ void HashJoinProbeBlockInputStream::cancel(bool kill) RestoreInfo restore_info; { std::lock_guard lock(mutex); - current_join = join; - restore_info.non_joined_stream = non_joined_stream; - restore_info.build_stream = restore_build_stream; - restore_info.probe_stream = restore_probe_stream; + current_join = probe_exec->join; + restore_info.non_joined_stream = probe_exec->non_joined_stream; + restore_info.build_stream = probe_exec->restore_build_stream; + restore_info.probe_stream = probe_exec->probe_stream; } /// Cancel join just wake up all the threads waiting in Join::waitUntilAllBuildFinished/Join::waitUntilAllProbeFinished, /// the ongoing join process will not be interrupted @@ -108,7 +109,7 @@ Block HashJoinProbeBlockInputStream::readImpl() catch (...) { auto error_message = getCurrentExceptionMessage(false, true); - join->meetError(error_message); + probe_exec->join->meetError(error_message); throw Exception(error_message); } } @@ -120,10 +121,10 @@ void HashJoinProbeBlockInputStream::readSuffixImpl() void HashJoinProbeBlockInputStream::onCurrentProbeDone() { - if (join->isRestoreJoin()) - current_probe_stream->readSuffix(); - join->finishOneProbe(); - if (need_output_non_joined_data || join->isEnableSpill()) + if (probe_exec->join->isRestoreJoin()) + probe_exec->probe_stream->readSuffix(); + probe_exec->join->finishOneProbe(); + if (need_output_non_joined_data || probe_exec->join->isEnableSpill()) { status = ProbeStatus::WAIT_PROBE_FINISH; } @@ -135,14 +136,14 @@ void HashJoinProbeBlockInputStream::onCurrentProbeDone() void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone() { - non_joined_stream->readSuffix(); - if (!join->isEnableSpill()) + probe_exec->non_joined_stream->readSuffix(); + if (!probe_exec->join->isEnableSpill()) { status = ProbeStatus::FINISHED; } else { - join->finishOneNonJoin(current_non_joined_stream_index); + probe_exec->join->finishOneNonJoin(current_non_joined_stream_index); status = ProbeStatus::GET_RESTORE_JOIN; } } @@ -152,17 +153,17 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() /// find restore join in DFS way while (true) { - assert(join->isEnableSpill()); + assert(probe_exec->join->isEnableSpill()); /// first check if current join has a partition to restore - if (join->hasPartitionSpilledWithLock()) + if (probe_exec->join->hasPartitionSpilledWithLock()) { - auto restore_info = join->getOneRestoreStream(max_block_size); + auto restore_info = probe_exec->join->getOneRestoreStream(max_block_size); /// get a restore join if (restore_info.join) { /// restored join should always enable spill assert(restore_info.join->isEnableSpill()); - parents.push_back(join); + parents.push_back(probe_exec); { std::lock_guard lock(mutex); if (isCancelledOrThrowIfKilled()) @@ -170,18 +171,18 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() status = ProbeStatus::FINISHED; return; } - join = restore_info.join; - restore_build_stream = restore_info.build_stream; - restore_probe_stream = restore_info.probe_stream; - non_joined_stream = restore_info.non_joined_stream; - current_probe_stream = restore_probe_stream; - if (non_joined_stream != nullptr) - current_non_joined_stream_index = dynamic_cast(non_joined_stream.get())->getNonJoinedIndex(); + probe_exec = std::make_shared(); + probe_exec->join = restore_info.join; + probe_exec->restore_build_stream = restore_info.build_stream; + probe_exec->probe_stream = restore_info.probe_stream; + probe_exec->non_joined_stream = restore_info.non_joined_stream; + if (probe_exec->non_joined_stream != nullptr) + current_non_joined_stream_index = dynamic_cast(probe_exec->non_joined_stream.get())->getNonJoinedIndex(); } status = ProbeStatus::RESTORE_BUILD; return; } - assert(join->hasPartitionSpilledWithLock() == false); + assert(probe_exec->join->hasPartitionSpilledWithLock() == false); } /// current join has no more partition to restore, so check if previous join still has partition to restore if (!parents.empty()) @@ -195,12 +196,8 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() } else { - join = parents.back(); + probe_exec = parents.back(); parents.pop_back(); - restore_probe_stream = nullptr; - restore_build_stream = nullptr; - current_probe_stream = nullptr; - non_joined_stream = nullptr; } } else @@ -216,9 +213,9 @@ void HashJoinProbeBlockInputStream::onAllProbeDone() { if (need_output_non_joined_data) { - assert(non_joined_stream != nullptr); + assert(probe_exec->non_joined_stream != nullptr); status = ProbeStatus::READ_NON_JOINED_DATA; - non_joined_stream->readPrefix(); + probe_exec->non_joined_stream->readPrefix(); } else { @@ -235,15 +232,15 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() switch (status) { case ProbeStatus::WAIT_BUILD_FINISH: - join->waitUntilAllBuildFinished(); + probe_exec->join->waitUntilAllBuildFinished(); /// after Build finish, always go to Probe stage - if (join->isRestoreJoin()) - current_probe_stream->readSuffix(); + if (probe_exec->join->isRestoreJoin()) + probe_exec->probe_stream->readSuffix(); status = ProbeStatus::PROBE; break; case ProbeStatus::PROBE: { - assert(current_probe_stream != nullptr); + assert(probe_exec->probe_stream != nullptr); if (probe_process_info.all_rows_joined_finish) { auto [partition_index, block] = getOneProbeBlock(); @@ -254,21 +251,21 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() } else { - join->checkTypes(block); + probe_exec->join->checkTypes(block); probe_process_info.resetBlock(std::move(block), partition_index); } } - auto ret = join->joinBlock(probe_process_info); + auto ret = probe_exec->join->joinBlock(probe_process_info); joined_rows += ret.rows(); return ret; } case ProbeStatus::WAIT_PROBE_FINISH: - join->waitUntilAllProbeFinished(); + probe_exec->join->waitUntilAllProbeFinished(); onAllProbeDone(); break; case ProbeStatus::READ_NON_JOINED_DATA: { - auto block = non_joined_stream->read(); + auto block = probe_exec->non_joined_stream->read(); non_joined_rows += block.rows(); if (!block) { @@ -285,9 +282,9 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() case ProbeStatus::RESTORE_BUILD: { probe_process_info.all_rows_joined_finish = true; - restore_build_stream->readPrefix(); - while (restore_build_stream->read()) {}; - restore_build_stream->readSuffix(); + probe_exec->restore_build_stream->readPrefix(); + while (probe_exec->restore_build_stream->read()) {}; + probe_exec->restore_build_stream->readSuffix(); status = ProbeStatus::WAIT_BUILD_FINISH; break; } @@ -311,9 +308,9 @@ std::tuple HashJoinProbeBlockInputStream::getOneProbeBlock() /// Even if spill is enabled, if spill is not triggered during build, /// there is no need to dispatch probe block - if (!join->isSpilled()) + if (!probe_exec->join->isSpilled()) { - block = current_probe_stream->read(); + block = probe_exec->probe_stream->read(); } else { @@ -329,9 +326,9 @@ std::tuple HashJoinProbeBlockInputStream::getOneProbeBlock() } else { - auto new_block = current_probe_stream->read(); + auto new_block = probe_exec->probe_stream->read(); if (new_block) - join->dispatchProbeBlock(new_block, probe_partition_blocks); + probe_exec->join->dispatchProbeBlock(new_block, probe_partition_blocks); else break; } diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index f643de3a1ea..36fa3f88757 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -16,6 +16,7 @@ #include #include +#include #include namespace DB @@ -125,19 +126,15 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream /// read them, so need to protect the multi-threads access std::mutex mutex; JoinPtr original_join; - JoinPtr join; + HashJoinProbeExecPtr probe_exec; const bool need_output_non_joined_data; size_t current_non_joined_stream_index; - BlockInputStreamPtr current_probe_stream; UInt64 max_block_size; ProbeProcessInfo probe_process_info; - BlockInputStreamPtr non_joined_stream; - BlockInputStreamPtr restore_build_stream; - BlockInputStreamPtr restore_probe_stream; ProbeStatus status{ProbeStatus::WAIT_BUILD_FINISH}; size_t joined_rows = 0; size_t non_joined_rows = 0; - std::list parents; + std::list parents; std::list> probe_partition_blocks; }; diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h new file mode 100644 index 00000000000..352d0f164c5 --- /dev/null +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -0,0 +1,39 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include + +#pragma once + +namespace DB +{ +class HashJoinProbeExec; +using HashJoinProbeExecPtr = std::shared_ptr; + +class HashJoinProbeExec +{ +public: + JoinPtr join; + + BlockInputStreamPtr restore_build_stream; + + BlockInputStreamPtr probe_stream; + + BlockInputStreamPtr non_joined_stream; +}; +} // namespace DB From 6018415724f9d6c4c87337230481e95e158d4dbe Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 24 Mar 2023 11:23:05 +0800 Subject: [PATCH 02/10] update --- .../HashJoinProbeBlockInputStream.cpp | 186 +++++------------- .../HashJoinProbeBlockInputStream.h | 16 +- dbms/src/DataStreams/HashJoinProbeExec.cpp | 164 +++++++++++++++ dbms/src/DataStreams/HashJoinProbeExec.h | 64 ++++++ 4 files changed, 282 insertions(+), 148 deletions(-) create mode 100644 dbms/src/DataStreams/HashJoinProbeExec.cpp diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index b08e877914f..a00042d7c68 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -26,20 +26,36 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( UInt64 max_block_size_) : log(Logger::get(req_id)) , original_join(join_) - , probe_exec(std::make_shared()) - , need_output_non_joined_data(join_->needReturnNonJoinedData()) - , current_non_joined_stream_index(non_joined_stream_index) - , max_block_size(max_block_size_) , probe_process_info(max_block_size_) { children.push_back(input); - probe_exec->probe_stream = children.back(); - probe_exec->join = original_join; - RUNTIME_CHECK_MSG(probe_exec->join != nullptr, "join ptr should not be null."); - RUNTIME_CHECK_MSG(probe_exec->join->getProbeConcurrency() > 0, "Join probe concurrency must be greater than 0"); + RUNTIME_CHECK_MSG(original_join != nullptr, "join ptr should not be null."); + RUNTIME_CHECK_MSG(original_join->getProbeConcurrency() > 0, "Join probe concurrency must be greater than 0"); + + bool need_output_non_joined_data = original_join->needReturnNonJoinedData(); + BlockInputStreamPtr non_joined_stream = nullptr; if (need_output_non_joined_data) - probe_exec->non_joined_stream = probe_exec->join->createStreamWithNonJoinedRows(input->getHeader(), current_non_joined_stream_index, probe_exec->join->getProbeConcurrency(), max_block_size); + non_joined_stream = original_join->createStreamWithNonJoinedRows(input->getHeader(), non_joined_stream_index, original_join->getProbeConcurrency(), max_block_size_); + + auto cur_probe_exec = std::make_shared( + original_join, + nullptr, + input, + need_output_non_joined_data, + non_joined_stream_index, + non_joined_stream, + max_block_size_); + probe_exec.set(std::move(cur_probe_exec)); +} + +void HashJoinProbeBlockInputStream::readPrefix() +{ +} + +void HashJoinProbeBlockInputStream::readSuffix() +{ + LOG_DEBUG(log, "Finish join probe, total output rows {}, joined rows {}, non joined rows {}", joined_rows + non_joined_rows, joined_rows, non_joined_rows); } Block HashJoinProbeBlockInputStream::getHeader() const @@ -58,15 +74,6 @@ void HashJoinProbeBlockInputStream::cancel(bool kill) /// When the probe stream quits probe by cancelling instead of normal finish, the Join operator might still produce meaningless blocks /// and expects these meaningless blocks won't be used to produce meaningful result. - JoinPtr current_join; - RestoreInfo restore_info; - { - std::lock_guard lock(mutex); - current_join = probe_exec->join; - restore_info.non_joined_stream = probe_exec->non_joined_stream; - restore_info.build_stream = probe_exec->restore_build_stream; - restore_info.probe_stream = probe_exec->probe_stream; - } /// Cancel join just wake up all the threads waiting in Join::waitUntilAllBuildFinished/Join::waitUntilAllProbeFinished, /// the ongoing join process will not be interrupted /// There is a little bit hack here because cancel will be called in two cases: @@ -78,25 +85,7 @@ void HashJoinProbeBlockInputStream::cancel(bool kill) /// threads is not 100% safe because if the probe thread is wake up when build is not finished yet, it may produce wrong results, for now /// it is safe because when any of the data stream read empty block because of early exit, the execution framework ensures that no further /// data will be used. - current_join->cancel(); - if (restore_info.non_joined_stream != nullptr) - { - auto * p_stream = dynamic_cast(restore_info.non_joined_stream.get()); - if (p_stream != nullptr) - p_stream->cancel(kill); - } - if (restore_info.probe_stream != nullptr) - { - auto * p_stream = dynamic_cast(restore_info.probe_stream.get()); - if (p_stream != nullptr) - p_stream->cancel(kill); - } - if (restore_info.build_stream != nullptr) - { - auto * p_stream = dynamic_cast(restore_info.build_stream.get()); - if (p_stream != nullptr) - p_stream->cancel(kill); - } + probe_exec->cancel(); } Block HashJoinProbeBlockInputStream::readImpl() @@ -109,43 +98,19 @@ Block HashJoinProbeBlockInputStream::readImpl() catch (...) { auto error_message = getCurrentExceptionMessage(false, true); - probe_exec->join->meetError(error_message); + probe_exec->meetError(error_message); throw Exception(error_message); } } -void HashJoinProbeBlockInputStream::readSuffixImpl() -{ - LOG_DEBUG(log, "Finish join probe, total output rows {}, joined rows {}, non joined rows {}", joined_rows + non_joined_rows, joined_rows, non_joined_rows); -} - void HashJoinProbeBlockInputStream::onCurrentProbeDone() { - if (probe_exec->join->isRestoreJoin()) - probe_exec->probe_stream->readSuffix(); - probe_exec->join->finishOneProbe(); - if (need_output_non_joined_data || probe_exec->join->isEnableSpill()) - { - status = ProbeStatus::WAIT_PROBE_FINISH; - } - else - { - status = ProbeStatus::FINISHED; - } + status = probe_exec->onProbeFinish() ? ProbeStatus::FINISHED : ProbeStatus::WAIT_PROBE_FINISH; } void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone() { - probe_exec->non_joined_stream->readSuffix(); - if (!probe_exec->join->isEnableSpill()) - { - status = ProbeStatus::FINISHED; - } - else - { - probe_exec->join->finishOneNonJoin(current_non_joined_stream_index); - status = ProbeStatus::GET_RESTORE_JOIN; - } + status = probe_exec->onNonJoinedFinish() ? ProbeStatus::FINISHED : ProbeStatus::GET_RESTORE_JOIN; } void HashJoinProbeBlockInputStream::tryGetRestoreJoin() @@ -153,42 +118,25 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() /// find restore join in DFS way while (true) { - assert(probe_exec->join->isEnableSpill()); - /// first check if current join has a partition to restore - if (probe_exec->join->hasPartitionSpilledWithLock()) + if (isCancelledOrThrowIfKilled()) { - auto restore_info = probe_exec->join->getOneRestoreStream(max_block_size); - /// get a restore join - if (restore_info.join) - { - /// restored join should always enable spill - assert(restore_info.join->isEnableSpill()); - parents.push_back(probe_exec); - { - std::lock_guard lock(mutex); - if (isCancelledOrThrowIfKilled()) - { - status = ProbeStatus::FINISHED; - return; - } - probe_exec = std::make_shared(); - probe_exec->join = restore_info.join; - probe_exec->restore_build_stream = restore_info.build_stream; - probe_exec->probe_stream = restore_info.probe_stream; - probe_exec->non_joined_stream = restore_info.non_joined_stream; - if (probe_exec->non_joined_stream != nullptr) - current_non_joined_stream_index = dynamic_cast(probe_exec->non_joined_stream.get())->getNonJoinedIndex(); - } - status = ProbeStatus::RESTORE_BUILD; - return; - } - assert(probe_exec->join->hasPartitionSpilledWithLock() == false); + status = ProbeStatus::FINISHED; + return; + } + + auto cur_probe_exec = *probe_exec; + auto restore_probe_exec = cur_probe_exec->tryGetRestoreExec(); + if (restore_probe_exec.has_value()) + { + parents.push_back(std::move(cur_probe_exec)); + probe_exec.set(std::move(*restore_probe_exec)); + status = ProbeStatus::RESTORE_BUILD; + return; } /// current join has no more partition to restore, so check if previous join still has partition to restore if (!parents.empty()) { /// replace current join with previous join - std::lock_guard lock(mutex); if (isCancelledOrThrowIfKilled()) { status = ProbeStatus::FINISHED; @@ -196,7 +144,7 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() } else { - probe_exec = parents.back(); + probe_exec.set(std::move(parents.back())); parents.pop_back(); } } @@ -211,7 +159,7 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() void HashJoinProbeBlockInputStream::onAllProbeDone() { - if (need_output_non_joined_data) + if (probe_exec->need_output_non_joined_data) { assert(probe_exec->non_joined_stream != nullptr); status = ProbeStatus::READ_NON_JOINED_DATA; @@ -234,16 +182,14 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() case ProbeStatus::WAIT_BUILD_FINISH: probe_exec->join->waitUntilAllBuildFinished(); /// after Build finish, always go to Probe stage - if (probe_exec->join->isRestoreJoin()) - probe_exec->probe_stream->readSuffix(); + probe_exec->onProbeStart(); status = ProbeStatus::PROBE; break; case ProbeStatus::PROBE: { - assert(probe_exec->probe_stream != nullptr); if (probe_process_info.all_rows_joined_finish) { - auto [partition_index, block] = getOneProbeBlock(); + auto [partition_index, block] = probe_exec->getProbeBlock(); if (!block) { onCurrentProbeDone(); @@ -282,9 +228,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() case ProbeStatus::RESTORE_BUILD: { probe_process_info.all_rows_joined_finish = true; - probe_exec->restore_build_stream->readPrefix(); - while (probe_exec->restore_build_stream->read()) {}; - probe_exec->restore_build_stream->readSuffix(); + probe_exec->restoreBuild(); status = ProbeStatus::WAIT_BUILD_FINISH; break; } @@ -301,40 +245,4 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() } } -std::tuple HashJoinProbeBlockInputStream::getOneProbeBlock() -{ - size_t partition_index = 0; - Block block; - - /// Even if spill is enabled, if spill is not triggered during build, - /// there is no need to dispatch probe block - if (!probe_exec->join->isSpilled()) - { - block = probe_exec->probe_stream->read(); - } - else - { - while (true) - { - if (!probe_partition_blocks.empty()) - { - auto partition_block = probe_partition_blocks.front(); - probe_partition_blocks.pop_front(); - partition_index = std::get<0>(partition_block); - block = std::get<1>(partition_block); - break; - } - else - { - auto new_block = probe_exec->probe_stream->read(); - if (new_block) - probe_exec->join->dispatchProbeBlock(new_block, probe_partition_blocks); - else - break; - } - } - } - return {partition_index, block}; -} - } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 36fa3f88757..b0db909f0fd 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -14,9 +14,9 @@ #pragma once +#include #include #include -#include #include namespace DB @@ -49,6 +49,10 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; + // Override readPrefix and readSuffix so that children's readPrefix and readSuffix are not called and are called by probe_exec. + void readPrefix() override; + void readSuffix() override; + private: /* * spill not enabled: @@ -119,23 +123,17 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream void onAllProbeDone(); void onCurrentReadNonJoinedDataDone(); void tryGetRestoreJoin(); - void readSuffixImpl() override; const LoggerPtr log; + JoinPtr original_join; /// join/non_joined_stream/restore_build_stream/restore_probe_stream can be modified during the runtime /// although read/write to those are almost only in 1 thread, but an exception is cancel thread will /// read them, so need to protect the multi-threads access - std::mutex mutex; - JoinPtr original_join; - HashJoinProbeExecPtr probe_exec; - const bool need_output_non_joined_data; - size_t current_non_joined_stream_index; - UInt64 max_block_size; + HashJoinProbeExecHolder probe_exec; ProbeProcessInfo probe_process_info; ProbeStatus status{ProbeStatus::WAIT_BUILD_FINISH}; size_t joined_rows = 0; size_t non_joined_rows = 0; std::list parents; - std::list> probe_partition_blocks; }; } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp new file mode 100644 index 00000000000..aee7d1c1815 --- /dev/null +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -0,0 +1,164 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +HashJoinProbeExec::HashJoinProbeExec( + const JoinPtr & join_, + const BlockInputStreamPtr & restore_build_stream_, + const BlockInputStreamPtr & probe_stream_, + bool need_output_non_joined_data_, + size_t non_joined_stream_index_, + const BlockInputStreamPtr & non_joined_stream_, + size_t max_block_size_) + : join(join_) + , restore_build_stream(restore_build_stream_) + , probe_stream(probe_stream_) + , need_output_non_joined_data(need_output_non_joined_data_) + , non_joined_stream_index(non_joined_stream_index_) + , non_joined_stream(non_joined_stream_) + , max_block_size(max_block_size_) +{} + +void HashJoinProbeExec::restoreBuild() +{ + restore_build_stream->readPrefix(); + while (restore_build_stream->read()) {}; + restore_build_stream->readSuffix(); +} + +std::tuple HashJoinProbeExec::getProbeBlock() +{ + size_t partition_index = 0; + Block block; + + /// Even if spill is enabled, if spill is not triggered during build, + /// there is no need to dispatch probe block + if (!join->isSpilled()) + { + block = probe_stream->read(); + } + else + { + while (true) + { + if (!probe_partition_blocks.empty()) + { + auto partition_block = probe_partition_blocks.front(); + probe_partition_blocks.pop_front(); + partition_index = std::get<0>(partition_block); + block = std::get<1>(partition_block); + break; + } + else + { + auto new_block = probe_stream->read(); + if (new_block) + join->dispatchProbeBlock(new_block, probe_partition_blocks); + else + break; + } + } + } + return {partition_index, block}; +} + +std::optional HashJoinProbeExec::tryGetRestoreExec() +{ + assert(join->isEnableSpill()); + /// first check if current join has a partition to restore + if (join->hasPartitionSpilledWithLock()) + { + auto restore_info = join->getOneRestoreStream(max_block_size); + /// get a restore join + if (restore_info.join) + { + /// restored join should always enable spill + assert(restore_info.join->isEnableSpill()); + size_t non_joined_stream_index = 0; + if (need_output_non_joined_data) + non_joined_stream_index = dynamic_cast(restore_info.non_joined_stream.get())->getNonJoinedIndex(); + auto restore_probe_exec = std::make_shared( + restore_info.join, + restore_info.build_stream, + restore_info.probe_stream, + need_output_non_joined_data, + non_joined_stream_index, + restore_info.non_joined_stream, + max_block_size); + return {std::move(restore_probe_exec)}; + } + assert(join->hasPartitionSpilledWithLock() == false); + } + return {}; +} + +void HashJoinProbeExec::cancel() +{ + join->cancel(); + if (non_joined_stream != nullptr) + { + auto * p_stream = dynamic_cast(non_joined_stream.get()); + if (p_stream != nullptr) + p_stream->cancel(false); + } + if (probe_stream != nullptr) + { + auto * p_stream = dynamic_cast(probe_stream.get()); + if (p_stream != nullptr) + p_stream->cancel(false); + } + if (restore_build_stream != nullptr) + { + auto * p_stream = dynamic_cast(restore_build_stream.get()); + if (p_stream != nullptr) + p_stream->cancel(false); + } +} + +void HashJoinProbeExec::meetError(const String & error_message) +{ + join->meetError(error_message); +} + +void HashJoinProbeExec::onProbeStart() +{ + probe_stream->readPrefix(); +} + +bool HashJoinProbeExec::onProbeFinish() +{ + probe_stream->readSuffix(); + join->finishOneProbe(); + return !need_output_non_joined_data && !join->isEnableSpill(); +} + +bool HashJoinProbeExec::onNonJoinedFinish() +{ + non_joined_stream->readSuffix(); + if (!join->isEnableSpill()) + { + return true; + } + else + { + join->finishOneNonJoin(non_joined_stream_index); + return false; + } +} +} // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index 352d0f164c5..43e80567070 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -34,6 +34,70 @@ class HashJoinProbeExec BlockInputStreamPtr probe_stream; + bool need_output_non_joined_data; + size_t non_joined_stream_index; BlockInputStreamPtr non_joined_stream; + + size_t max_block_size; + + std::list> probe_partition_blocks; + +public: + HashJoinProbeExec( + const JoinPtr & join_, + const BlockInputStreamPtr & restore_build_stream_, + const BlockInputStreamPtr & probe_stream_, + bool need_output_non_joined_data_, + size_t non_joined_stream_index_, + const BlockInputStreamPtr & non_joined_stream_, + size_t max_block_size_); + + void restoreBuild(); + + std::tuple getProbeBlock(); + + std::optional tryGetRestoreExec(); + + void cancel(); + + void meetError(const String & error_message); + + void onProbeStart(); + // Returns true if the probe_exec ends. + // Returns false if the probe_exec continues to execute. + bool onProbeFinish(); + + // Returns true if the probe_exec ends. + // Returns false if the probe_exec continues to execute. + bool onNonJoinedFinish(); +}; + +class HashJoinProbeExecHolder +{ +public: + const HashJoinProbeExecPtr & operator->() + { + std::lock_guard lock(mu); + assert(exec); + return exec; + } + + const HashJoinProbeExecPtr & operator*() + { + std::lock_guard lock(mu); + assert(exec); + return exec; + } + + void set(HashJoinProbeExecPtr && new_one) + { + assert(new_one); + std::lock_guard lock(mu); + exec = new_one; + } + +private: + std::mutex mu; + HashJoinProbeExecPtr exec; }; } // namespace DB From 8afe7495951171ff65128e4526479b12e600f5b2 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 24 Mar 2023 14:23:35 +0800 Subject: [PATCH 03/10] version1 --- .../HashJoinProbeBlockInputStream.cpp | 63 ++++++++----------- .../HashJoinProbeBlockInputStream.h | 5 +- dbms/src/DataStreams/HashJoinProbeExec.cpp | 56 +++++++++-------- 3 files changed, 56 insertions(+), 68 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index a00042d7c68..01aff9f76d6 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -16,6 +16,8 @@ #include #include +#include + namespace DB { HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( @@ -49,11 +51,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( probe_exec.set(std::move(cur_probe_exec)); } -void HashJoinProbeBlockInputStream::readPrefix() -{ -} - -void HashJoinProbeBlockInputStream::readSuffix() +void HashJoinProbeBlockInputStream::readSuffixImpl() { LOG_DEBUG(log, "Finish join probe, total output rows {}, joined rows {}, non joined rows {}", joined_rows + non_joined_rows, joined_rows, non_joined_rows); } @@ -90,27 +88,23 @@ void HashJoinProbeBlockInputStream::cancel(bool kill) Block HashJoinProbeBlockInputStream::readImpl() { - try - { - Block ret = getOutputBlock(); - return ret; - } - catch (...) - { - auto error_message = getCurrentExceptionMessage(false, true); - probe_exec->meetError(error_message); - throw Exception(error_message); - } + return getOutputBlock(); +} + +void HashJoinProbeBlockInputStream::switchStatus(ProbeStatus to) +{ + LOG_TRACE(log, fmt::format("{} -> {}", magic_enum::enum_name(status), magic_enum::enum_name(to))); + status = to; } void HashJoinProbeBlockInputStream::onCurrentProbeDone() { - status = probe_exec->onProbeFinish() ? ProbeStatus::FINISHED : ProbeStatus::WAIT_PROBE_FINISH; + switchStatus(probe_exec->onProbeFinish() ? ProbeStatus::FINISHED : ProbeStatus::WAIT_PROBE_FINISH); } void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone() { - status = probe_exec->onNonJoinedFinish() ? ProbeStatus::FINISHED : ProbeStatus::GET_RESTORE_JOIN; + switchStatus(probe_exec->onNonJoinedFinish() ? ProbeStatus::FINISHED : ProbeStatus::GET_RESTORE_JOIN); } void HashJoinProbeBlockInputStream::tryGetRestoreJoin() @@ -120,7 +114,7 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() { if (isCancelledOrThrowIfKilled()) { - status = ProbeStatus::FINISHED; + switchStatus(ProbeStatus::FINISHED); return; } @@ -130,28 +124,20 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() { parents.push_back(std::move(cur_probe_exec)); probe_exec.set(std::move(*restore_probe_exec)); - status = ProbeStatus::RESTORE_BUILD; + switchStatus(ProbeStatus::RESTORE_BUILD); return; } /// current join has no more partition to restore, so check if previous join still has partition to restore if (!parents.empty()) { /// replace current join with previous join - if (isCancelledOrThrowIfKilled()) - { - status = ProbeStatus::FINISHED; - return; - } - else - { - probe_exec.set(std::move(parents.back())); - parents.pop_back(); - } + probe_exec.set(std::move(parents.back())); + parents.pop_back(); } else { /// no previous join, set status to FINISHED - status = ProbeStatus::FINISHED; + switchStatus(ProbeStatus::FINISHED); return; } } @@ -162,12 +148,12 @@ void HashJoinProbeBlockInputStream::onAllProbeDone() if (probe_exec->need_output_non_joined_data) { assert(probe_exec->non_joined_stream != nullptr); - status = ProbeStatus::READ_NON_JOINED_DATA; probe_exec->non_joined_stream->readPrefix(); + switchStatus(ProbeStatus::READ_NON_JOINED_DATA); } else { - status = ProbeStatus::GET_RESTORE_JOIN; + switchStatus(ProbeStatus::GET_RESTORE_JOIN); } } @@ -183,7 +169,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() probe_exec->join->waitUntilAllBuildFinished(); /// after Build finish, always go to Probe stage probe_exec->onProbeStart(); - status = ProbeStatus::PROBE; + switchStatus(ProbeStatus::PROBE); break; case ProbeStatus::PROBE: { @@ -229,7 +215,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() { probe_process_info.all_rows_joined_finish = true; probe_exec->restoreBuild(); - status = ProbeStatus::WAIT_BUILD_FINISH; + switchStatus(ProbeStatus::WAIT_BUILD_FINISH); break; } case ProbeStatus::FINISHED: @@ -239,9 +225,10 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() } catch (...) { - /// set status to finish if any exception happens - status = ProbeStatus::FINISHED; - throw; + auto error_message = getCurrentExceptionMessage(true, true); + probe_exec->meetError(error_message); + switchStatus(ProbeStatus::FINISHED); + throw Exception(error_message); } } diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index b0db909f0fd..15ff167def4 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -49,9 +49,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream protected: Block readImpl() override; - // Override readPrefix and readSuffix so that children's readPrefix and readSuffix are not called and are called by probe_exec. - void readPrefix() override; - void readSuffix() override; + void readSuffixImpl() override; private: /* @@ -117,6 +115,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream FINISHED, /// the final state }; + void switchStatus(ProbeStatus to); Block getOutputBlock(); std::tuple getOneProbeBlock(); void onCurrentProbeDone(); diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp index aee7d1c1815..5dc7e4a32b8 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.cpp +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -13,8 +13,8 @@ // limitations under the License. #include -#include #include +#include namespace DB { @@ -80,32 +80,32 @@ std::tuple HashJoinProbeExec::getProbeBlock() std::optional HashJoinProbeExec::tryGetRestoreExec() { - assert(join->isEnableSpill()); - /// first check if current join has a partition to restore - if (join->hasPartitionSpilledWithLock()) + assert(join->isEnableSpill()); + /// first check if current join has a partition to restore + if (join->hasPartitionSpilledWithLock()) + { + auto restore_info = join->getOneRestoreStream(max_block_size); + /// get a restore join + if (restore_info.join) { - auto restore_info = join->getOneRestoreStream(max_block_size); - /// get a restore join - if (restore_info.join) - { - /// restored join should always enable spill - assert(restore_info.join->isEnableSpill()); - size_t non_joined_stream_index = 0; - if (need_output_non_joined_data) - non_joined_stream_index = dynamic_cast(restore_info.non_joined_stream.get())->getNonJoinedIndex(); - auto restore_probe_exec = std::make_shared( - restore_info.join, - restore_info.build_stream, - restore_info.probe_stream, - need_output_non_joined_data, - non_joined_stream_index, - restore_info.non_joined_stream, - max_block_size); - return {std::move(restore_probe_exec)}; - } - assert(join->hasPartitionSpilledWithLock() == false); + /// restored join should always enable spill + assert(restore_info.join->isEnableSpill()); + size_t non_joined_stream_index = 0; + if (need_output_non_joined_data) + non_joined_stream_index = dynamic_cast(restore_info.non_joined_stream.get())->getNonJoinedIndex(); + auto restore_probe_exec = std::make_shared( + restore_info.join, + restore_info.build_stream, + restore_info.probe_stream, + need_output_non_joined_data, + non_joined_stream_index, + restore_info.non_joined_stream, + max_block_size); + return {std::move(restore_probe_exec)}; } - return {}; + assert(join->hasPartitionSpilledWithLock() == false); + } + return {}; } void HashJoinProbeExec::cancel() @@ -138,12 +138,14 @@ void HashJoinProbeExec::meetError(const String & error_message) void HashJoinProbeExec::onProbeStart() { - probe_stream->readPrefix(); + if (join->isRestoreJoin()) + probe_stream->readPrefix(); } bool HashJoinProbeExec::onProbeFinish() { - probe_stream->readSuffix(); + if (join->isRestoreJoin()) + probe_stream->readSuffix(); join->finishOneProbe(); return !need_output_non_joined_data && !join->isEnableSpill(); } From 3732c7e7bcfdafcfcf47486c076c28a56e5e72ef Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 24 Mar 2023 14:56:37 +0800 Subject: [PATCH 04/10] update --- .../HashJoinProbeBlockInputStream.cpp | 37 +++++++---------- .../HashJoinProbeBlockInputStream.h | 1 - dbms/src/DataStreams/HashJoinProbeExec.cpp | 41 +++++++++++++++++++ dbms/src/DataStreams/HashJoinProbeExec.h | 13 ++++++ 4 files changed, 68 insertions(+), 24 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 01aff9f76d6..e99c7bd7488 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -28,7 +28,6 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( UInt64 max_block_size_) : log(Logger::get(req_id)) , original_join(join_) - , probe_process_info(max_block_size_) { children.push_back(input); @@ -145,10 +144,9 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() void HashJoinProbeBlockInputStream::onAllProbeDone() { - if (probe_exec->need_output_non_joined_data) + if (probe_exec->needOutputNonJoinedData()) { - assert(probe_exec->non_joined_stream != nullptr); - probe_exec->non_joined_stream->readPrefix(); + probe_exec->onNonJoinedStart(); switchStatus(ProbeStatus::READ_NON_JOINED_DATA); } else @@ -166,38 +164,32 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() switch (status) { case ProbeStatus::WAIT_BUILD_FINISH: - probe_exec->join->waitUntilAllBuildFinished(); + probe_exec->waitUntilAllBuildFinished(); /// after Build finish, always go to Probe stage probe_exec->onProbeStart(); switchStatus(ProbeStatus::PROBE); break; case ProbeStatus::PROBE: { - if (probe_process_info.all_rows_joined_finish) + auto ret = probe_exec->probe(); + if (!ret) { - auto [partition_index, block] = probe_exec->getProbeBlock(); - if (!block) - { - onCurrentProbeDone(); - break; - } - else - { - probe_exec->join->checkTypes(block); - probe_process_info.resetBlock(std::move(block), partition_index); - } + onCurrentProbeDone(); + break; + } + else + { + joined_rows += ret.rows(); + return ret; } - auto ret = probe_exec->join->joinBlock(probe_process_info); - joined_rows += ret.rows(); - return ret; } case ProbeStatus::WAIT_PROBE_FINISH: - probe_exec->join->waitUntilAllProbeFinished(); + probe_exec->waitUntilAllProbeFinished(); onAllProbeDone(); break; case ProbeStatus::READ_NON_JOINED_DATA: { - auto block = probe_exec->non_joined_stream->read(); + auto block = probe_exec->fetchNonJoined(); non_joined_rows += block.rows(); if (!block) { @@ -213,7 +205,6 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() } case ProbeStatus::RESTORE_BUILD: { - probe_process_info.all_rows_joined_finish = true; probe_exec->restoreBuild(); switchStatus(ProbeStatus::WAIT_BUILD_FINISH); break; diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 15ff167def4..dc0c0eb9bf3 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -128,7 +128,6 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream /// although read/write to those are almost only in 1 thread, but an exception is cancel thread will /// read them, so need to protect the multi-threads access HashJoinProbeExecHolder probe_exec; - ProbeProcessInfo probe_process_info; ProbeStatus status{ProbeStatus::WAIT_BUILD_FINISH}; size_t joined_rows = 0; size_t non_joined_rows = 0; diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp index 5dc7e4a32b8..20dbe874e80 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.cpp +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -33,8 +33,19 @@ HashJoinProbeExec::HashJoinProbeExec( , non_joined_stream_index(non_joined_stream_index_) , non_joined_stream(non_joined_stream_) , max_block_size(max_block_size_) + , probe_process_info(max_block_size_) {} +void HashJoinProbeExec::waitUntilAllBuildFinished() +{ + join->waitUntilAllBuildFinished(); +} + +void HashJoinProbeExec::waitUntilAllProbeFinished() +{ + join->waitUntilAllProbeFinished(); +} + void HashJoinProbeExec::restoreBuild() { restore_build_stream->readPrefix(); @@ -78,6 +89,24 @@ std::tuple HashJoinProbeExec::getProbeBlock() return {partition_index, block}; } +Block HashJoinProbeExec::probe() +{ + if (probe_process_info.all_rows_joined_finish) + { + auto [partition_index, block] = getProbeBlock(); + if (!block) + { + return {}; + } + else + { + join->checkTypes(block); + probe_process_info.resetBlock(std::move(block), partition_index); + } + } + return join->joinBlock(probe_process_info); +} + std::optional HashJoinProbeExec::tryGetRestoreExec() { assert(join->isEnableSpill()); @@ -150,6 +179,18 @@ bool HashJoinProbeExec::onProbeFinish() return !need_output_non_joined_data && !join->isEnableSpill(); } +void HashJoinProbeExec::onNonJoinedStart() +{ + assert(non_joined_stream != nullptr); + non_joined_stream->readPrefix(); +} + +Block HashJoinProbeExec::fetchNonJoined() +{ + assert(non_joined_stream != nullptr); + return non_joined_stream->read(); +} + bool HashJoinProbeExec::onNonJoinedFinish() { non_joined_stream->readSuffix(); diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index 43e80567070..d9fc345b21d 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -40,6 +40,8 @@ class HashJoinProbeExec size_t max_block_size; + ProbeProcessInfo probe_process_info; + std::list> probe_partition_blocks; public: @@ -52,10 +54,19 @@ class HashJoinProbeExec const BlockInputStreamPtr & non_joined_stream_, size_t max_block_size_); + bool needOutputNonJoinedData() { return need_output_non_joined_data; } + + void waitUntilAllBuildFinished(); + + void waitUntilAllProbeFinished(); + void restoreBuild(); std::tuple getProbeBlock(); + // Returns empty block if probe finish. + Block probe(); + std::optional tryGetRestoreExec(); void cancel(); @@ -67,6 +78,8 @@ class HashJoinProbeExec // Returns false if the probe_exec continues to execute. bool onProbeFinish(); + void onNonJoinedStart(); + Block fetchNonJoined(); // Returns true if the probe_exec ends. // Returns false if the probe_exec continues to execute. bool onNonJoinedFinish(); From aba0373f613ca7641b6d282c8c0ebf684b6d0c90 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 24 Mar 2023 15:01:23 +0800 Subject: [PATCH 05/10] u --- dbms/src/DataStreams/HashJoinProbeExec.h | 51 ++++++++++++------------ 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index d9fc345b21d..76b8efbaab1 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -27,23 +27,6 @@ using HashJoinProbeExecPtr = std::shared_ptr; class HashJoinProbeExec { -public: - JoinPtr join; - - BlockInputStreamPtr restore_build_stream; - - BlockInputStreamPtr probe_stream; - - bool need_output_non_joined_data; - size_t non_joined_stream_index; - BlockInputStreamPtr non_joined_stream; - - size_t max_block_size; - - ProbeProcessInfo probe_process_info; - - std::list> probe_partition_blocks; - public: HashJoinProbeExec( const JoinPtr & join_, @@ -54,35 +37,51 @@ class HashJoinProbeExec const BlockInputStreamPtr & non_joined_stream_, size_t max_block_size_); - bool needOutputNonJoinedData() { return need_output_non_joined_data; } - void waitUntilAllBuildFinished(); void waitUntilAllProbeFinished(); - void restoreBuild(); - - std::tuple getProbeBlock(); - - // Returns empty block if probe finish. - Block probe(); - std::optional tryGetRestoreExec(); void cancel(); void meetError(const String & error_message); + void restoreBuild(); + void onProbeStart(); + // Returns empty block if probe finish. + Block probe(); // Returns true if the probe_exec ends. // Returns false if the probe_exec continues to execute. bool onProbeFinish(); + bool needOutputNonJoinedData() { return need_output_non_joined_data; } void onNonJoinedStart(); Block fetchNonJoined(); // Returns true if the probe_exec ends. // Returns false if the probe_exec continues to execute. bool onNonJoinedFinish(); + +private: + std::tuple getProbeBlock(); + +private: + JoinPtr join; + + BlockInputStreamPtr restore_build_stream; + + BlockInputStreamPtr probe_stream; + + bool need_output_non_joined_data; + size_t non_joined_stream_index; + BlockInputStreamPtr non_joined_stream; + + size_t max_block_size; + + ProbeProcessInfo probe_process_info; + + std::list> probe_partition_blocks; }; class HashJoinProbeExecHolder From 1e4f678548db44bef15f3dadba12f4893315172d Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 24 Mar 2023 16:05:23 +0800 Subject: [PATCH 06/10] update --- .../HashJoinProbeBlockInputStream.cpp | 79 +++++-------------- .../HashJoinProbeBlockInputStream.h | 9 ++- dbms/src/DataStreams/HashJoinProbeExec.cpp | 57 ++++++++++++- dbms/src/DataStreams/HashJoinProbeExec.h | 15 +++- 4 files changed, 94 insertions(+), 66 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index e99c7bd7488..94c6f34e348 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -34,20 +34,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( RUNTIME_CHECK_MSG(original_join != nullptr, "join ptr should not be null."); RUNTIME_CHECK_MSG(original_join->getProbeConcurrency() > 0, "Join probe concurrency must be greater than 0"); - bool need_output_non_joined_data = original_join->needReturnNonJoinedData(); - BlockInputStreamPtr non_joined_stream = nullptr; - if (need_output_non_joined_data) - non_joined_stream = original_join->createStreamWithNonJoinedRows(input->getHeader(), non_joined_stream_index, original_join->getProbeConcurrency(), max_block_size_); - - auto cur_probe_exec = std::make_shared( - original_join, - nullptr, - input, - need_output_non_joined_data, - non_joined_stream_index, - non_joined_stream, - max_block_size_); - probe_exec.set(std::move(cur_probe_exec)); + probe_exec.set(HashJoinProbeExec::build(original_join, input, non_joined_stream_index, max_block_size_)); } void HashJoinProbeBlockInputStream::readSuffixImpl() @@ -71,17 +58,6 @@ void HashJoinProbeBlockInputStream::cancel(bool kill) /// When the probe stream quits probe by cancelling instead of normal finish, the Join operator might still produce meaningless blocks /// and expects these meaningless blocks won't be used to produce meaningful result. - /// Cancel join just wake up all the threads waiting in Join::waitUntilAllBuildFinished/Join::waitUntilAllProbeFinished, - /// the ongoing join process will not be interrupted - /// There is a little bit hack here because cancel will be called in two cases: - /// 1. the query is cancelled by the caller or meet error: in this case, wake up all waiting threads is safe - /// 2. the query is executed normally, and one of the data stream has read an empty block, the the data stream and all its children - /// will call `cancel(false)`, in this case, there is two sub-cases - /// a. the data stream read an empty block because of EOF, then it means there must be no threads waiting in Join, so cancel the join is safe - /// b. the data stream read an empty block because of early exit of some executor(like limit), in this case, just wake the waiting - /// threads is not 100% safe because if the probe thread is wake up when build is not finished yet, it may produce wrong results, for now - /// it is safe because when any of the data stream read empty block because of early exit, the execution framework ensures that no further - /// data will be used. probe_exec->cancel(); } @@ -108,45 +84,25 @@ void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone() void HashJoinProbeBlockInputStream::tryGetRestoreJoin() { - /// find restore join in DFS way - while (true) + auto cur_probe_exec = *probe_exec; + auto restore_probe_exec = cur_probe_exec->tryGetRestoreExec([&]() { return isCancelledOrThrowIfKilled(); }); + if (restore_probe_exec.has_value() && !isCancelledOrThrowIfKilled()) { - if (isCancelledOrThrowIfKilled()) - { - switchStatus(ProbeStatus::FINISHED); - return; - } - - auto cur_probe_exec = *probe_exec; - auto restore_probe_exec = cur_probe_exec->tryGetRestoreExec(); - if (restore_probe_exec.has_value()) - { - parents.push_back(std::move(cur_probe_exec)); - probe_exec.set(std::move(*restore_probe_exec)); - switchStatus(ProbeStatus::RESTORE_BUILD); - return; - } - /// current join has no more partition to restore, so check if previous join still has partition to restore - if (!parents.empty()) - { - /// replace current join with previous join - probe_exec.set(std::move(parents.back())); - parents.pop_back(); - } - else - { - /// no previous join, set status to FINISHED - switchStatus(ProbeStatus::FINISHED); - return; - } + probe_exec.set(std::move(*restore_probe_exec)); + switchStatus(ProbeStatus::RESTORE_BUILD); + } + else + { + switchStatus(ProbeStatus::FINISHED); } } void HashJoinProbeBlockInputStream::onAllProbeDone() { - if (probe_exec->needOutputNonJoinedData()) + const auto & cur_probe_exec = *probe_exec; + if (cur_probe_exec->needOutputNonJoinedData()) { - probe_exec->onNonJoinedStart(); + cur_probe_exec->onNonJoinedStart(); switchStatus(ProbeStatus::READ_NON_JOINED_DATA); } else @@ -164,11 +120,14 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() switch (status) { case ProbeStatus::WAIT_BUILD_FINISH: - probe_exec->waitUntilAllBuildFinished(); + { + const auto & cur_probe_exec = *probe_exec; + cur_probe_exec->waitUntilAllBuildFinished(); /// after Build finish, always go to Probe stage - probe_exec->onProbeStart(); + cur_probe_exec->onProbeStart(); switchStatus(ProbeStatus::PROBE); break; + } case ProbeStatus::PROBE: { auto ret = probe_exec->probe(); @@ -184,9 +143,11 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() } } case ProbeStatus::WAIT_PROBE_FINISH: + { probe_exec->waitUntilAllProbeFinished(); onAllProbeDone(); break; + } case ProbeStatus::READ_NON_JOINED_DATA: { auto block = probe_exec->fetchNonJoined(); diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index dc0c0eb9bf3..844b09ed177 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -122,11 +122,14 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream void onAllProbeDone(); void onCurrentReadNonJoinedDataDone(); void tryGetRestoreJoin(); + +private: const LoggerPtr log; JoinPtr original_join; - /// join/non_joined_stream/restore_build_stream/restore_probe_stream can be modified during the runtime - /// although read/write to those are almost only in 1 thread, but an exception is cancel thread will - /// read them, so need to protect the multi-threads access + /// probe_exec can be modified during the runtime, + /// although read/write to those are almost only in 1 thread, + /// but an exception is cancel thread will read them, + /// so need to use HashJoinProbeExecHolder protect the multi-threads access. HashJoinProbeExecHolder probe_exec; ProbeStatus status{ProbeStatus::WAIT_BUILD_FINISH}; size_t joined_rows = 0; diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp index 20dbe874e80..ccac05bbc5e 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.cpp +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -18,6 +18,27 @@ namespace DB { +HashJoinProbeExecPtr HashJoinProbeExec::build( + const JoinPtr & join, + const BlockInputStreamPtr & probe_stream, + size_t non_joined_stream_index, + size_t max_block_size) +{ + bool need_output_non_joined_data = join->needReturnNonJoinedData(); + BlockInputStreamPtr non_joined_stream = nullptr; + if (need_output_non_joined_data) + non_joined_stream = join->createStreamWithNonJoinedRows(probe_stream->getHeader(), non_joined_stream_index, join->getProbeConcurrency(), max_block_size); + + return std::make_shared( + join, + nullptr, + probe_stream, + need_output_non_joined_data, + non_joined_stream_index, + non_joined_stream, + max_block_size); +} + HashJoinProbeExec::HashJoinProbeExec( const JoinPtr & join_, const BlockInputStreamPtr & restore_build_stream_, @@ -107,7 +128,28 @@ Block HashJoinProbeExec::probe() return join->joinBlock(probe_process_info); } -std::optional HashJoinProbeExec::tryGetRestoreExec() +std::optional HashJoinProbeExec::tryGetRestoreExec(std::function && is_cancelled) +{ + /// find restore exec in DFS way + if (is_cancelled()) + return {}; + + auto ret = doTryGetRestoreExec(); + if (ret.has_value()) + return ret; + + /// current join has no more partition to restore, so check if previous join still has partition to restore + if (parent.has_value()) + { + return (*parent)->tryGetRestoreExec(std::move(is_cancelled)); + } + else + { + return {}; + } +} + +std::optional HashJoinProbeExec::doTryGetRestoreExec() { assert(join->isEnableSpill()); /// first check if current join has a partition to restore @@ -130,6 +172,7 @@ std::optional HashJoinProbeExec::tryGetRestoreExec() non_joined_stream_index, restore_info.non_joined_stream, max_block_size); + restore_probe_exec->parent = shared_from_this(); return {std::move(restore_probe_exec)}; } assert(join->hasPartitionSpilledWithLock() == false); @@ -139,6 +182,18 @@ std::optional HashJoinProbeExec::tryGetRestoreExec() void HashJoinProbeExec::cancel() { + /// Cancel join just wake up all the threads waiting in Join::waitUntilAllBuildFinished/Join::waitUntilAllProbeFinished, + /// the ongoing join process will not be interrupted + /// There is a little bit hack here because cancel will be called in two cases: + /// 1. the query is cancelled by the caller or meet error: in this case, wake up all waiting threads is safe + /// 2. the query is executed normally, and one of the data stream has read an empty block, the the data stream and all its children + /// will call `cancel(false)`, in this case, there is two sub-cases + /// a. the data stream read an empty block because of EOF, then it means there must be no threads waiting in Join, so cancel the join is safe + /// b. the data stream read an empty block because of early exit of some executor(like limit), in this case, just wake the waiting + /// threads is not 100% safe because if the probe thread is wake up when build is not finished yet, it may produce wrong results, for now + /// it is safe because when any of the data stream read empty block because of early exit, the execution framework ensures that no further + /// data will be used. + join->cancel(); if (non_joined_stream != nullptr) { diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index 76b8efbaab1..8dec3b2cc75 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -25,9 +25,15 @@ namespace DB class HashJoinProbeExec; using HashJoinProbeExecPtr = std::shared_ptr; -class HashJoinProbeExec +class HashJoinProbeExec : public std::enable_shared_from_this { public: + static HashJoinProbeExecPtr build( + const JoinPtr & join, + const BlockInputStreamPtr & probe_stream, + size_t non_joined_stream_index, + size_t max_block_size); + HashJoinProbeExec( const JoinPtr & join_, const BlockInputStreamPtr & restore_build_stream_, @@ -41,7 +47,7 @@ class HashJoinProbeExec void waitUntilAllProbeFinished(); - std::optional tryGetRestoreExec(); + std::optional tryGetRestoreExec(std::function && is_cancelled); void cancel(); @@ -66,6 +72,8 @@ class HashJoinProbeExec private: std::tuple getProbeBlock(); + std::optional doTryGetRestoreExec(); + private: JoinPtr join; @@ -80,8 +88,9 @@ class HashJoinProbeExec size_t max_block_size; ProbeProcessInfo probe_process_info; - std::list> probe_partition_blocks; + + std::optional parent; }; class HashJoinProbeExecHolder From 4619f6fc2c5814e624bd7b37db0537a8651e8983 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 27 Mar 2023 10:04:47 +0800 Subject: [PATCH 07/10] update --- .../HashJoinProbeBlockInputStream.cpp | 10 ++-- .../HashJoinProbeBlockInputStream.h | 2 - dbms/src/DataStreams/HashJoinProbeExec.cpp | 47 +++++++++++-------- dbms/src/DataStreams/HashJoinProbeExec.h | 31 ++++++++---- dbms/src/Interpreters/Join.cpp | 6 +-- dbms/src/Interpreters/Join.h | 2 +- 6 files changed, 60 insertions(+), 38 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 94c6f34e348..206a9ba18fb 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -35,6 +35,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( RUNTIME_CHECK_MSG(original_join->getProbeConcurrency() > 0, "Join probe concurrency must be greater than 0"); probe_exec.set(HashJoinProbeExec::build(original_join, input, non_joined_stream_index, max_block_size_)); + probe_exec->setCancellationHook([&]() { return isCancelledOrThrowIfKilled(); }); } void HashJoinProbeBlockInputStream::readSuffixImpl() @@ -85,7 +86,7 @@ void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone() void HashJoinProbeBlockInputStream::tryGetRestoreJoin() { auto cur_probe_exec = *probe_exec; - auto restore_probe_exec = cur_probe_exec->tryGetRestoreExec([&]() { return isCancelledOrThrowIfKilled(); }); + auto restore_probe_exec = cur_probe_exec->tryGetRestoreExec(); if (restore_probe_exec.has_value() && !isCancelledOrThrowIfKilled()) { probe_exec.set(std::move(*restore_probe_exec)); @@ -99,7 +100,7 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() void HashJoinProbeBlockInputStream::onAllProbeDone() { - const auto & cur_probe_exec = *probe_exec; + auto cur_probe_exec = *probe_exec; if (cur_probe_exec->needOutputNonJoinedData()) { cur_probe_exec->onNonJoinedStart(); @@ -117,11 +118,14 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() { while (true) { + if unlikely (isCancelledOrThrowIfKilled()) + return {}; + switch (status) { case ProbeStatus::WAIT_BUILD_FINISH: { - const auto & cur_probe_exec = *probe_exec; + auto cur_probe_exec = *probe_exec; cur_probe_exec->waitUntilAllBuildFinished(); /// after Build finish, always go to Probe stage cur_probe_exec->onProbeStart(); diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 844b09ed177..e3c1a04a688 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -16,7 +16,6 @@ #include #include -#include #include namespace DB @@ -134,7 +133,6 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream ProbeStatus status{ProbeStatus::WAIT_BUILD_FINISH}; size_t joined_rows = 0; size_t non_joined_rows = 0; - std::list parents; }; } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp index ccac05bbc5e..ce619a3c382 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.cpp +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -70,7 +70,13 @@ void HashJoinProbeExec::waitUntilAllProbeFinished() void HashJoinProbeExec::restoreBuild() { restore_build_stream->readPrefix(); - while (restore_build_stream->read()) {}; + if unlikely (is_cancelled()) + return; + while (restore_build_stream->read()) + { + if unlikely (is_cancelled()) + return; + } restore_build_stream->readSuffix(); } @@ -89,6 +95,9 @@ std::tuple HashJoinProbeExec::getProbeBlock() { while (true) { + if unlikely (is_cancelled()) + return {0, {}}; + if (!probe_partition_blocks.empty()) { auto partition_block = probe_partition_blocks.front(); @@ -128,12 +137,12 @@ Block HashJoinProbeExec::probe() return join->joinBlock(probe_process_info); } -std::optional HashJoinProbeExec::tryGetRestoreExec(std::function && is_cancelled) +std::optional HashJoinProbeExec::tryGetRestoreExec() { - /// find restore exec in DFS way - if (is_cancelled()) + if unlikely (is_cancelled()) return {}; + /// find restore exec in DFS way auto ret = doTryGetRestoreExec(); if (ret.has_value()) return ret; @@ -141,7 +150,7 @@ std::optional HashJoinProbeExec::tryGetRestoreExec(std::fu /// current join has no more partition to restore, so check if previous join still has partition to restore if (parent.has_value()) { - return (*parent)->tryGetRestoreExec(std::move(is_cancelled)); + return (*parent)->tryGetRestoreExec(); } else { @@ -155,24 +164,27 @@ std::optional HashJoinProbeExec::doTryGetRestoreExec() /// first check if current join has a partition to restore if (join->hasPartitionSpilledWithLock()) { - auto restore_info = join->getOneRestoreStream(max_block_size); /// get a restore join - if (restore_info.join) + if (auto restore_info = join->getOneRestoreStream(max_block_size); restore_info) { /// restored join should always enable spill - assert(restore_info.join->isEnableSpill()); + assert(restore_info->join && restore_info->join->isEnableSpill()); size_t non_joined_stream_index = 0; if (need_output_non_joined_data) - non_joined_stream_index = dynamic_cast(restore_info.non_joined_stream.get())->getNonJoinedIndex(); + { + assert(restore_info->non_joined_stream); + non_joined_stream_index = dynamic_cast(restore_info->non_joined_stream.get())->getNonJoinedIndex(); + } auto restore_probe_exec = std::make_shared( - restore_info.join, - restore_info.build_stream, - restore_info.probe_stream, + restore_info->join, + restore_info->build_stream, + restore_info->probe_stream, need_output_non_joined_data, non_joined_stream_index, - restore_info.non_joined_stream, + restore_info->non_joined_stream, max_block_size); restore_probe_exec->parent = shared_from_this(); + restore_probe_exec->setCancellationHook(is_cancelled); return {std::move(restore_probe_exec)}; } assert(join->hasPartitionSpilledWithLock() == false); @@ -197,20 +209,17 @@ void HashJoinProbeExec::cancel() join->cancel(); if (non_joined_stream != nullptr) { - auto * p_stream = dynamic_cast(non_joined_stream.get()); - if (p_stream != nullptr) + if (auto * p_stream = dynamic_cast(non_joined_stream.get()); p_stream != nullptr) p_stream->cancel(false); } if (probe_stream != nullptr) { - auto * p_stream = dynamic_cast(probe_stream.get()); - if (p_stream != nullptr) + if (auto * p_stream = dynamic_cast(probe_stream.get()); p_stream != nullptr) p_stream->cancel(false); } if (restore_build_stream != nullptr) { - auto * p_stream = dynamic_cast(restore_build_stream.get()); - if (p_stream != nullptr) + if (auto * p_stream = dynamic_cast(restore_build_stream.get()); p_stream != nullptr) p_stream->cancel(false); } } diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index 8dec3b2cc75..05141a8304d 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -34,6 +34,8 @@ class HashJoinProbeExec : public std::enable_shared_from_this size_t non_joined_stream_index, size_t max_block_size); + using CancellationHook = std::function; + HashJoinProbeExec( const JoinPtr & join_, const BlockInputStreamPtr & restore_build_stream_, @@ -47,7 +49,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this void waitUntilAllProbeFinished(); - std::optional tryGetRestoreExec(std::function && is_cancelled); + std::optional tryGetRestoreExec(); void cancel(); @@ -69,23 +71,32 @@ class HashJoinProbeExec : public std::enable_shared_from_this // Returns false if the probe_exec continues to execute. bool onNonJoinedFinish(); + void setCancellationHook(CancellationHook cancellation_hook) + { + is_cancelled = std::move(cancellation_hook); + } + private: std::tuple getProbeBlock(); std::optional doTryGetRestoreExec(); private: - JoinPtr join; + const JoinPtr join; + + const BlockInputStreamPtr restore_build_stream; - BlockInputStreamPtr restore_build_stream; + const BlockInputStreamPtr probe_stream; - BlockInputStreamPtr probe_stream; + const bool need_output_non_joined_data; + const size_t non_joined_stream_index; + const BlockInputStreamPtr non_joined_stream; - bool need_output_non_joined_data; - size_t non_joined_stream_index; - BlockInputStreamPtr non_joined_stream; + const size_t max_block_size; - size_t max_block_size; + CancellationHook is_cancelled{[]() { + return false; + }}; ProbeProcessInfo probe_process_info; std::list> probe_partition_blocks; @@ -96,14 +107,14 @@ class HashJoinProbeExec : public std::enable_shared_from_this class HashJoinProbeExecHolder { public: - const HashJoinProbeExecPtr & operator->() + HashJoinProbeExecPtr operator->() { std::lock_guard lock(mu); assert(exec); return exec; } - const HashJoinProbeExecPtr & operator*() + HashJoinProbeExecPtr operator*() { std::lock_guard lock(mu); assert(exec); diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 7ebff808a37..9e7e3643934 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -3118,7 +3118,7 @@ bool Join::hasPartitionSpilled() return !spilled_partition_indexes.empty(); } -RestoreInfo Join::getOneRestoreStream(size_t max_block_size) +std::optional Join::getOneRestoreStream(size_t max_block_size) { std::unique_lock lock(build_probe_mutex); if (meet_error) @@ -3141,7 +3141,7 @@ RestoreInfo Join::getOneRestoreStream(size_t max_block_size) { spilled_partition_indexes.pop_front(); } - return {restore_join, non_joined_data_stream, build_stream, probe_stream}; + return RestoreInfo{restore_join, non_joined_data_stream, build_stream, probe_stream}; } if (spilled_partition_indexes.empty()) { @@ -3179,7 +3179,7 @@ RestoreInfo Join::getOneRestoreStream(size_t max_block_size) restore_non_joined_data_streams[i] = restore_join->createStreamWithNonJoinedRows(probe_stream->getHeader(), i, restore_join_build_concurrency, max_block_size); } auto non_joined_data_stream = get_back_stream(restore_non_joined_data_streams); - return {restore_join, non_joined_data_stream, build_stream, probe_stream}; + return RestoreInfo{restore_join, non_joined_data_stream, build_stream, probe_stream}; } catch (...) { diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 49cc01be6e8..69d1b589736 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -155,7 +155,7 @@ class Join bool isSpilled() const { return is_spilled; } - RestoreInfo getOneRestoreStream(size_t max_block_size); + std::optional getOneRestoreStream(size_t max_block_size); void dispatchProbeBlock(Block & block, std::list> & partition_blocks_list); From 3ec15c4272b88e9fe683e50fa56cb197b64ca677 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 27 Mar 2023 14:20:08 +0800 Subject: [PATCH 08/10] udpate --- dbms/src/DataStreams/HashJoinProbeExec.cpp | 28 +++--- dbms/src/DataStreams/HashJoinProbeExec.h | 4 +- dbms/src/Interpreters/Join.cpp | 27 +++--- dbms/src/Interpreters/Join.h | 99 ++++++++++++++-------- 4 files changed, 90 insertions(+), 68 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp index ce619a3c382..18680bc8c1c 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.cpp +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -80,31 +80,26 @@ void HashJoinProbeExec::restoreBuild() restore_build_stream->readSuffix(); } -std::tuple HashJoinProbeExec::getProbeBlock() +PartitionBlock HashJoinProbeExec::getProbeBlock() { - size_t partition_index = 0; - Block block; - /// Even if spill is enabled, if spill is not triggered during build, /// there is no need to dispatch probe block if (!join->isSpilled()) { - block = probe_stream->read(); + return PartitionBlock{probe_stream->read()}; } else { while (true) { if unlikely (is_cancelled()) - return {0, {}}; + return {}; if (!probe_partition_blocks.empty()) { - auto partition_block = probe_partition_blocks.front(); + auto partition_block = std::move(probe_partition_blocks.front()); probe_partition_blocks.pop_front(); - partition_index = std::get<0>(partition_block); - block = std::get<1>(partition_block); - break; + return partition_block; } else { @@ -112,26 +107,25 @@ std::tuple HashJoinProbeExec::getProbeBlock() if (new_block) join->dispatchProbeBlock(new_block, probe_partition_blocks); else - break; + return {}; } } } - return {partition_index, block}; } Block HashJoinProbeExec::probe() { if (probe_process_info.all_rows_joined_finish) { - auto [partition_index, block] = getProbeBlock(); - if (!block) + auto partition_block = getProbeBlock(); + if (partition_block) { - return {}; + join->checkTypes(partition_block.block); + probe_process_info.resetBlock(std::move(partition_block.block), partition_block.partition_index); } else { - join->checkTypes(block); - probe_process_info.resetBlock(std::move(block), partition_index); + return {}; } } return join->joinBlock(probe_process_info); diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index 05141a8304d..d0d9052c4d2 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -77,7 +77,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this } private: - std::tuple getProbeBlock(); + PartitionBlock getProbeBlock(); std::optional doTryGetRestoreExec(); @@ -99,7 +99,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this }}; ProbeProcessInfo probe_process_info; - std::list> probe_partition_blocks; + PartitionBlocks probe_partition_blocks; std::optional parent; }; diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index f8e4164ff39..b52833504f5 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -2709,19 +2709,20 @@ void Join::joinBlockImplNullAware(Block & block, const Maps & maps) const void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right) const { size_t keys_size = key_names_left.size(); - for (size_t i = 0; i < keys_size; ++i) { /// Compare up to Nullability. - DataTypePtr left_type = removeNullable(block_left.getByName(key_names_left[i]).type); DataTypePtr right_type = removeNullable(block_right.getByName(key_names_right[i]).type); - - if (!left_type->equals(*right_type)) - throw Exception("Type mismatch of columns to JOIN by: " - + key_names_left[i] + " " + left_type->getName() + " at left, " - + key_names_right[i] + " " + right_type->getName() + " at right", - ErrorCodes::TYPE_MISMATCH); + if unlikely (!left_type->equals(*right_type)) + throw Exception( + fmt::format( + "Type mismatch of columns to JOIN by: {} {} at left, {} {} at right", + key_names_left[i], + left_type->getName(), + key_names_right[i], + right_type->getName()), + ErrorCodes::TYPE_MISMATCH); } } @@ -3149,7 +3150,7 @@ std::optional Join::getOneRestoreStream(size_t max_block_size_) { spilled_partition_indexes.pop_front(); } - return RestoreInfo{restore_join, non_joined_data_stream, build_stream, probe_stream}; + return RestoreInfo{restore_join, std::move(non_joined_data_stream), std::move(build_stream), std::move(probe_stream)}; } if (spilled_partition_indexes.empty()) { @@ -3187,7 +3188,7 @@ std::optional Join::getOneRestoreStream(size_t max_block_size_) restore_non_joined_data_streams[i] = restore_join->createStreamWithNonJoinedRows(probe_stream->getHeader(), i, restore_join_build_concurrency, max_block_size_); } auto non_joined_data_stream = get_back_stream(restore_non_joined_data_streams); - return RestoreInfo{restore_join, non_joined_data_stream, build_stream, probe_stream}; + return RestoreInfo{restore_join, std::move(non_joined_data_stream), std::move(build_stream), std::move(probe_stream)}; } catch (...) { @@ -3200,7 +3201,7 @@ std::optional Join::getOneRestoreStream(size_t max_block_size_) } } -void Join::dispatchProbeBlock(Block & block, std::list> & partition_blocks_list) +void Join::dispatchProbeBlock(Block & block, PartitionBlocks & partition_blocks_list) { Blocks partition_blocks = dispatchBlock(key_names_left, block); for (size_t i = 0; i < partition_blocks.size(); ++i) @@ -3223,7 +3224,9 @@ void Join::dispatchProbeBlock(Block & block, std::list probe_spiller->spillBlocks(std::move(blocks_to_spill), i); } else - partition_blocks_list.push_back({i, partition_blocks[i]}); + { + partition_blocks_list.emplace_back(i, std::move(partition_blocks[i])); + } } } diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 288612cda7b..2c6df7475a9 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -34,8 +34,67 @@ namespace DB { -struct ProbeProcessInfo; -struct RestoreInfo; +class Join; +using JoinPtr = std::shared_ptr; +using Joins = std::vector; + +struct RestoreInfo +{ + JoinPtr join; + BlockInputStreamPtr non_joined_stream; + BlockInputStreamPtr build_stream; + BlockInputStreamPtr probe_stream; + + RestoreInfo(JoinPtr & join_, BlockInputStreamPtr && non_joined_data_stream_, BlockInputStreamPtr && build_stream_, BlockInputStreamPtr && probe_stream_) + : join(join_) + , non_joined_stream(std::move(non_joined_data_stream_)) + , build_stream(std::move(build_stream_)) + , probe_stream(std::move(probe_stream_)) + {} +}; + +struct PartitionBlock +{ + size_t partition_index; + Block block; + + PartitionBlock() + : partition_index(0) + , block({}) + {} + + explicit PartitionBlock(Block && block_) + : partition_index(0) + , block(std::move(block_)) + {} + + PartitionBlock(size_t partition_index_, Block && block_) + : partition_index(partition_index_) + , block(std::move(block_)) + {} + + explicit operator bool() const { return static_cast(block); } + bool operator!() const { return !block; } +}; +using PartitionBlocks = std::list; + +struct ProbeProcessInfo +{ + Block block; + size_t partition_index; + UInt64 max_block_size; + size_t start_row; + size_t end_row; + bool all_rows_joined_finish; + + explicit ProbeProcessInfo(UInt64 max_block_size_) + : max_block_size(max_block_size_) + , all_rows_joined_finish(true) + {} + + void resetBlock(Block && block_, size_t partition_index_ = 0); + void updateStartRow(); +}; /** Data structure for implementation of JOIN. * It is just a hash table: keys -> rows of joined ("right") table. @@ -90,8 +149,6 @@ struct RestoreInfo; * Always generate Nullable column and substitute NULLs for non-joined rows, * as in standard SQL. */ -using JoinPtr = std::shared_ptr; -using Joins = std::vector; class Join { @@ -158,7 +215,7 @@ class Join std::optional getOneRestoreStream(size_t max_block_size); - void dispatchProbeBlock(Block & block, std::list> & partition_blocks_list); + void dispatchProbeBlock(Block & block, PartitionBlocks & partition_blocks_list); Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block); @@ -521,38 +578,6 @@ class Join void joinBlockImplNullAware(Block & block, const Maps & maps) const; }; -struct RestoreInfo -{ - JoinPtr join; - BlockInputStreamPtr non_joined_stream; - BlockInputStreamPtr build_stream; - BlockInputStreamPtr probe_stream; - - RestoreInfo() = default; - RestoreInfo(JoinPtr & join_, BlockInputStreamPtr non_joined_data_stream_, BlockInputStreamPtr build_stream_, BlockInputStreamPtr probe_stream_) - : join(join_) - , non_joined_stream(non_joined_data_stream_) - , build_stream(build_stream_) - , probe_stream(probe_stream_){}; -}; - -struct ProbeProcessInfo -{ - Block block; - size_t partition_index; - UInt64 max_block_size; - size_t start_row; - size_t end_row; - bool all_rows_joined_finish; - - explicit ProbeProcessInfo(UInt64 max_block_size_) - : max_block_size(max_block_size_) - , all_rows_joined_finish(true){}; - - void resetBlock(Block && block_, size_t partition_index_ = 0); - void updateStartRow(); -}; - void convertColumnToNullable(ColumnWithTypeAndName & column); } // namespace DB From c25ab082ab3677436ccd80f50ba11f4f4fe93115 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 6 Apr 2023 03:03:49 +0800 Subject: [PATCH 09/10] update --- .../PtrHolder.h} | 35 +++++++++++-------- .../HashJoinProbeBlockInputStream.cpp | 15 ++++---- dbms/src/DataStreams/HashJoinProbeExec.cpp | 3 +- dbms/src/DataStreams/HashJoinProbeExec.h | 30 ++-------------- .../WNEstablishDisaggTaskHandler.cpp | 1 - .../WNEstablishDisaggTaskHandler.h | 2 +- dbms/src/Flash/Executor/QueryExecutor.h | 2 ++ dbms/src/Flash/Mpp/MPPTask.h | 2 +- 8 files changed, 35 insertions(+), 55 deletions(-) rename dbms/src/{Flash/Executor/QueryExecutorHolder.h => Common/PtrHolder.h} (61%) diff --git a/dbms/src/Flash/Executor/QueryExecutorHolder.h b/dbms/src/Common/PtrHolder.h similarity index 61% rename from dbms/src/Flash/Executor/QueryExecutorHolder.h rename to dbms/src/Common/PtrHolder.h index bc3b050fa64..bb96dc7fd21 100644 --- a/dbms/src/Flash/Executor/QueryExecutorHolder.h +++ b/dbms/src/Common/PtrHolder.h @@ -14,40 +14,47 @@ #pragma once -#include - #include +#include namespace DB { -class QueryExecutorHolder +template +class PtrHolder { public: - void set(QueryExecutorPtr && query_executor_) + void set(Ptr && obj_) { + assert(obj_); std::lock_guard lock(mu); - assert(!query_executor); - query_executor = std::move(query_executor_); + obj = std::move(obj_); } - std::optional tryGet() + auto tryGet() { - std::optional res; + std::optional res; std::lock_guard lock(mu); - if (query_executor != nullptr) - res.emplace(query_executor.get()); + if (obj != nullptr) + res.emplace(obj.get()); return res; } - QueryExecutor * operator->() + auto * operator->() + { + std::lock_guard lock(mu); + assert(obj != nullptr); + return obj.get(); + } + + auto & operator*() { std::lock_guard lock(mu); - assert(query_executor != nullptr); - return query_executor.get(); + assert(obj != nullptr); + return *obj.get(); } private: std::mutex mu; - QueryExecutorPtr query_executor; + Ptr obj; }; } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 206a9ba18fb..f72a32c027d 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -85,8 +85,7 @@ void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone() void HashJoinProbeBlockInputStream::tryGetRestoreJoin() { - auto cur_probe_exec = *probe_exec; - auto restore_probe_exec = cur_probe_exec->tryGetRestoreExec(); + auto restore_probe_exec = probe_exec->tryGetRestoreExec(); if (restore_probe_exec.has_value() && !isCancelledOrThrowIfKilled()) { probe_exec.set(std::move(*restore_probe_exec)); @@ -100,10 +99,10 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin() void HashJoinProbeBlockInputStream::onAllProbeDone() { - auto cur_probe_exec = *probe_exec; - if (cur_probe_exec->needOutputNonJoinedData()) + auto & cur_probe_exec = *probe_exec; + if (cur_probe_exec.needOutputNonJoinedData()) { - cur_probe_exec->onNonJoinedStart(); + cur_probe_exec.onNonJoinedStart(); switchStatus(ProbeStatus::READ_NON_JOINED_DATA); } else @@ -125,10 +124,10 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() { case ProbeStatus::WAIT_BUILD_FINISH: { - auto cur_probe_exec = *probe_exec; - cur_probe_exec->waitUntilAllBuildFinished(); + auto & cur_probe_exec = *probe_exec; + cur_probe_exec.waitUntilAllBuildFinished(); /// after Build finish, always go to Probe stage - cur_probe_exec->onProbeStart(); + cur_probe_exec.onProbeStart(); switchStatus(ProbeStatus::PROBE); break; } diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp index 18680bc8c1c..ff0ca60fb66 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.cpp +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -137,8 +137,7 @@ std::optional HashJoinProbeExec::tryGetRestoreExec() return {}; /// find restore exec in DFS way - auto ret = doTryGetRestoreExec(); - if (ret.has_value()) + if (auto ret = doTryGetRestoreExec(); ret.has_value()) return ret; /// current join has no more partition to restore, so check if previous join still has partition to restore diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index d0d9052c4d2..58f65b044d8 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -104,32 +105,5 @@ class HashJoinProbeExec : public std::enable_shared_from_this std::optional parent; }; -class HashJoinProbeExecHolder -{ -public: - HashJoinProbeExecPtr operator->() - { - std::lock_guard lock(mu); - assert(exec); - return exec; - } - - HashJoinProbeExecPtr operator*() - { - std::lock_guard lock(mu); - assert(exec); - return exec; - } - - void set(HashJoinProbeExecPtr && new_one) - { - assert(new_one); - std::lock_guard lock(mu); - exec = new_one; - } - -private: - std::mutex mu; - HashJoinProbeExecPtr exec; -}; +using HashJoinProbeExecHolder = PtrHolder; } // namespace DB diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index 22b3ee809bf..4e2d34f81ee 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.h b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.h index 88d0c13cbc8..dd89398da34 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.h +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include #include diff --git a/dbms/src/Flash/Executor/QueryExecutor.h b/dbms/src/Flash/Executor/QueryExecutor.h index 9c90d2e968d..2a7f830682f 100644 --- a/dbms/src/Flash/Executor/QueryExecutor.h +++ b/dbms/src/Flash/Executor/QueryExecutor.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include #include @@ -69,4 +70,5 @@ class QueryExecutor }; using QueryExecutorPtr = std::unique_ptr; +using QueryExecutorHolder = PtrHolder; } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 302b64fff87..21049644a3e 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include #include From 7fb7694a4907c8f2f978c538785c15fb46c2df3f Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 10 Apr 2023 11:44:30 +0800 Subject: [PATCH 10/10] merge master and address comment --- .../HashJoinProbeBlockInputStream.cpp | 5 ++--- dbms/src/DataStreams/HashJoinProbeExec.cpp | 17 +++++------------ dbms/src/DataStreams/HashJoinProbeExec.h | 6 +++--- dbms/src/Interpreters/Join.h | 18 ------------------ 4 files changed, 10 insertions(+), 36 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 69221f808e3..df59cefe618 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -82,10 +82,9 @@ void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone() void HashJoinProbeBlockInputStream::tryGetRestoreJoin() { - auto restore_probe_exec = probe_exec->tryGetRestoreExec(); - if (restore_probe_exec.has_value() && !isCancelledOrThrowIfKilled()) + if (auto restore_probe_exec = probe_exec->tryGetRestoreExec(); restore_probe_exec && unlikely(!isCancelledOrThrowIfKilled())) { - probe_exec.set(std::move(*restore_probe_exec)); + probe_exec.set(std::move(restore_probe_exec)); switchStatus(ProbeStatus::RESTORE_BUILD); } else diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp index 1251aa2023b..6954524ede9 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.cpp +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -131,27 +131,20 @@ Block HashJoinProbeExec::probe() return join->joinBlock(probe_process_info); } -std::optional HashJoinProbeExec::tryGetRestoreExec() +HashJoinProbeExecPtr HashJoinProbeExec::tryGetRestoreExec() { if unlikely (is_cancelled()) return {}; /// find restore exec in DFS way - if (auto ret = doTryGetRestoreExec(); ret.has_value()) + if (auto ret = doTryGetRestoreExec(); ret) return ret; /// current join has no more partition to restore, so check if previous join still has partition to restore - if (parent.has_value()) - { - return (*parent)->tryGetRestoreExec(); - } - else - { - return {}; - } + return parent ? parent->tryGetRestoreExec() : HashJoinProbeExecPtr{}; } -std::optional HashJoinProbeExec::doTryGetRestoreExec() +HashJoinProbeExecPtr HashJoinProbeExec::doTryGetRestoreExec() { assert(join->isEnableSpill()); /// first check if current join has a partition to restore @@ -178,7 +171,7 @@ std::optional HashJoinProbeExec::doTryGetRestoreExec() max_block_size); restore_probe_exec->parent = shared_from_this(); restore_probe_exec->setCancellationHook(is_cancelled); - return {std::move(restore_probe_exec)}; + return restore_probe_exec; } assert(join->hasPartitionSpilledWithLock() == false); } diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index 58f65b044d8..2fa1f215382 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -50,7 +50,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this void waitUntilAllProbeFinished(); - std::optional tryGetRestoreExec(); + HashJoinProbeExecPtr tryGetRestoreExec(); void cancel(); @@ -80,7 +80,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this private: PartitionBlock getProbeBlock(); - std::optional doTryGetRestoreExec(); + HashJoinProbeExecPtr doTryGetRestoreExec(); private: const JoinPtr join; @@ -102,7 +102,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this ProbeProcessInfo probe_process_info; PartitionBlocks probe_partition_blocks; - std::optional parent; + HashJoinProbeExecPtr parent; }; using HashJoinProbeExecHolder = PtrHolder; diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 296edb9fcc2..d0d470cebb1 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -75,24 +75,6 @@ struct PartitionBlock }; using PartitionBlocks = std::list; -struct ProbeProcessInfo -{ - Block block; - size_t partition_index; - UInt64 max_block_size; - size_t start_row; - size_t end_row; - bool all_rows_joined_finish; - - explicit ProbeProcessInfo(UInt64 max_block_size_) - : max_block_size(max_block_size_) - , all_rows_joined_finish(true) - {} - - void resetBlock(Block && block_, size_t partition_index_ = 0); - void updateStartRow(); -}; - /** Data structure for implementation of JOIN. * It is just a hash table: keys -> rows of joined ("right") table. * Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys.