Skip to content

Commit

Permalink
merge master and address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise committed Apr 10, 2023
1 parent 59d6127 commit 7fb7694
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 36 deletions.
5 changes: 2 additions & 3 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone()

void HashJoinProbeBlockInputStream::tryGetRestoreJoin()
{
auto restore_probe_exec = probe_exec->tryGetRestoreExec();
if (restore_probe_exec.has_value() && !isCancelledOrThrowIfKilled())
if (auto restore_probe_exec = probe_exec->tryGetRestoreExec(); restore_probe_exec && unlikely(!isCancelledOrThrowIfKilled()))
{
probe_exec.set(std::move(*restore_probe_exec));
probe_exec.set(std::move(restore_probe_exec));
switchStatus(ProbeStatus::RESTORE_BUILD);
}
else
Expand Down
17 changes: 5 additions & 12 deletions dbms/src/DataStreams/HashJoinProbeExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,20 @@ Block HashJoinProbeExec::probe()
return join->joinBlock(probe_process_info);
}

std::optional<HashJoinProbeExecPtr> HashJoinProbeExec::tryGetRestoreExec()
HashJoinProbeExecPtr HashJoinProbeExec::tryGetRestoreExec()
{
if unlikely (is_cancelled())
return {};

/// find restore exec in DFS way
if (auto ret = doTryGetRestoreExec(); ret.has_value())
if (auto ret = doTryGetRestoreExec(); ret)
return ret;

/// current join has no more partition to restore, so check if previous join still has partition to restore
if (parent.has_value())
{
return (*parent)->tryGetRestoreExec();
}
else
{
return {};
}
return parent ? parent->tryGetRestoreExec() : HashJoinProbeExecPtr{};
}

std::optional<HashJoinProbeExecPtr> HashJoinProbeExec::doTryGetRestoreExec()
HashJoinProbeExecPtr HashJoinProbeExec::doTryGetRestoreExec()
{
assert(join->isEnableSpill());
/// first check if current join has a partition to restore
Expand All @@ -178,7 +171,7 @@ std::optional<HashJoinProbeExecPtr> HashJoinProbeExec::doTryGetRestoreExec()
max_block_size);
restore_probe_exec->parent = shared_from_this();
restore_probe_exec->setCancellationHook(is_cancelled);
return {std::move(restore_probe_exec)};
return restore_probe_exec;
}
assert(join->hasPartitionSpilledWithLock() == false);
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/HashJoinProbeExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this<HashJoinProbeExec>

void waitUntilAllProbeFinished();

std::optional<HashJoinProbeExecPtr> tryGetRestoreExec();
HashJoinProbeExecPtr tryGetRestoreExec();

void cancel();

Expand Down Expand Up @@ -80,7 +80,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this<HashJoinProbeExec>
private:
PartitionBlock getProbeBlock();

std::optional<HashJoinProbeExecPtr> doTryGetRestoreExec();
HashJoinProbeExecPtr doTryGetRestoreExec();

private:
const JoinPtr join;
Expand All @@ -102,7 +102,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this<HashJoinProbeExec>
ProbeProcessInfo probe_process_info;
PartitionBlocks probe_partition_blocks;

std::optional<HashJoinProbeExecPtr> parent;
HashJoinProbeExecPtr parent;
};

using HashJoinProbeExecHolder = PtrHolder<HashJoinProbeExecPtr>;
Expand Down
18 changes: 0 additions & 18 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,6 @@ struct PartitionBlock
};
using PartitionBlocks = std::list<PartitionBlock>;

struct ProbeProcessInfo
{
Block block;
size_t partition_index;
UInt64 max_block_size;
size_t start_row;
size_t end_row;
bool all_rows_joined_finish;

explicit ProbeProcessInfo(UInt64 max_block_size_)
: max_block_size(max_block_size_)
, all_rows_joined_finish(true)
{}

void resetBlock(Block && block_, size_t partition_index_ = 0);
void updateStartRow();
};

/** Data structure for implementation of JOIN.
* It is just a hash table: keys -> rows of joined ("right") table.
* Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys.
Expand Down

0 comments on commit 7fb7694

Please sign in to comment.