From 7fb7694a4907c8f2f978c538785c15fb46c2df3f Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 10 Apr 2023 11:44:30 +0800 Subject: [PATCH] merge master and address comment --- .../HashJoinProbeBlockInputStream.cpp | 5 ++--- dbms/src/DataStreams/HashJoinProbeExec.cpp | 17 +++++------------ dbms/src/DataStreams/HashJoinProbeExec.h | 6 +++--- dbms/src/Interpreters/Join.h | 18 ------------------ 4 files changed, 10 insertions(+), 36 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 69221f808e3..df59cefe618 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -82,10 +82,9 @@ void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone() void HashJoinProbeBlockInputStream::tryGetRestoreJoin() { - auto restore_probe_exec = probe_exec->tryGetRestoreExec(); - if (restore_probe_exec.has_value() && !isCancelledOrThrowIfKilled()) + if (auto restore_probe_exec = probe_exec->tryGetRestoreExec(); restore_probe_exec && unlikely(!isCancelledOrThrowIfKilled())) { - probe_exec.set(std::move(*restore_probe_exec)); + probe_exec.set(std::move(restore_probe_exec)); switchStatus(ProbeStatus::RESTORE_BUILD); } else diff --git a/dbms/src/DataStreams/HashJoinProbeExec.cpp b/dbms/src/DataStreams/HashJoinProbeExec.cpp index 1251aa2023b..6954524ede9 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.cpp +++ b/dbms/src/DataStreams/HashJoinProbeExec.cpp @@ -131,27 +131,20 @@ Block HashJoinProbeExec::probe() return join->joinBlock(probe_process_info); } -std::optional HashJoinProbeExec::tryGetRestoreExec() +HashJoinProbeExecPtr HashJoinProbeExec::tryGetRestoreExec() { if unlikely (is_cancelled()) return {}; /// find restore exec in DFS way - if (auto ret = doTryGetRestoreExec(); ret.has_value()) + if (auto ret = doTryGetRestoreExec(); ret) return ret; /// current join has no more partition to restore, so check if previous join still has partition to restore - if (parent.has_value()) - { - return (*parent)->tryGetRestoreExec(); - } - else - { - return {}; - } + return parent ? parent->tryGetRestoreExec() : HashJoinProbeExecPtr{}; } -std::optional HashJoinProbeExec::doTryGetRestoreExec() +HashJoinProbeExecPtr HashJoinProbeExec::doTryGetRestoreExec() { assert(join->isEnableSpill()); /// first check if current join has a partition to restore @@ -178,7 +171,7 @@ std::optional HashJoinProbeExec::doTryGetRestoreExec() max_block_size); restore_probe_exec->parent = shared_from_this(); restore_probe_exec->setCancellationHook(is_cancelled); - return {std::move(restore_probe_exec)}; + return restore_probe_exec; } assert(join->hasPartitionSpilledWithLock() == false); } diff --git a/dbms/src/DataStreams/HashJoinProbeExec.h b/dbms/src/DataStreams/HashJoinProbeExec.h index 58f65b044d8..2fa1f215382 100644 --- a/dbms/src/DataStreams/HashJoinProbeExec.h +++ b/dbms/src/DataStreams/HashJoinProbeExec.h @@ -50,7 +50,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this void waitUntilAllProbeFinished(); - std::optional tryGetRestoreExec(); + HashJoinProbeExecPtr tryGetRestoreExec(); void cancel(); @@ -80,7 +80,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this private: PartitionBlock getProbeBlock(); - std::optional doTryGetRestoreExec(); + HashJoinProbeExecPtr doTryGetRestoreExec(); private: const JoinPtr join; @@ -102,7 +102,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this ProbeProcessInfo probe_process_info; PartitionBlocks probe_partition_blocks; - std::optional parent; + HashJoinProbeExecPtr parent; }; using HashJoinProbeExecHolder = PtrHolder; diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 296edb9fcc2..d0d470cebb1 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -75,24 +75,6 @@ struct PartitionBlock }; using PartitionBlocks = std::list; -struct ProbeProcessInfo -{ - Block block; - size_t partition_index; - UInt64 max_block_size; - size_t start_row; - size_t end_row; - bool all_rows_joined_finish; - - explicit ProbeProcessInfo(UInt64 max_block_size_) - : max_block_size(max_block_size_) - , all_rows_joined_finish(true) - {} - - void resetBlock(Block && block_, size_t partition_index_ = 0); - void updateStartRow(); -}; - /** Data structure for implementation of JOIN. * It is just a hash table: keys -> rows of joined ("right") table. * Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys.