Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise committed Apr 7, 2023
2 parents 2eba7b2 + b8786b0 commit cf5deb8
Show file tree
Hide file tree
Showing 8 changed files with 1,188 additions and 1,066 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void CreatingSetsBlockInputStream::createAll()
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setInitActiveBuildConcurrency();
elem.second.join->setInitActiveBuildThreads();
}
}
Stopwatch watch;
Expand Down
13 changes: 5 additions & 8 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(

probe_exec.set(HashJoinProbeExec::build(original_join, input, non_joined_stream_index, max_block_size_));
probe_exec->setCancellationHook([&]() { return isCancelledOrThrowIfKilled(); });

ProbeProcessInfo header_probe_process_info(0);
header_probe_process_info.resetBlock(input->getHeader());
header = original_join->joinBlock(header_probe_process_info, true);
}

void HashJoinProbeBlockInputStream::readSuffixImpl()
Expand All @@ -45,19 +49,12 @@ void HashJoinProbeBlockInputStream::readSuffixImpl()

Block HashJoinProbeBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
assert(res.rows() == 0);
ProbeProcessInfo header_probe_process_info(0);
header_probe_process_info.resetBlock(std::move(res));
/// use original_join here so we don't need add lock
return original_join->joinBlock(header_probe_process_info);
return header;
}

void HashJoinProbeBlockInputStream::cancel(bool kill)
{
IProfilingBlockInputStream::cancel(kill);
/// When the probe stream quits probe by cancelling instead of normal finish, the Join operator might still produce meaningless blocks
/// and expects these meaningless blocks won't be used to produce meaningful result.

probe_exec->cancel();
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
ProbeStatus status{ProbeStatus::WAIT_BUILD_FINISH};
size_t joined_rows = 0;
size_t non_joined_rows = 0;

Block header;
};

} // namespace DB
30 changes: 17 additions & 13 deletions dbms/src/DataStreams/HashJoinProbeExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,23 @@ std::optional<HashJoinProbeExecPtr> HashJoinProbeExec::doTryGetRestoreExec()

void HashJoinProbeExec::cancel()
{
/// Cancel join just wake up all the threads waiting in Join::waitUntilAllBuildFinished/Join::waitUntilAllProbeFinished,
/// the ongoing join process will not be interrupted
/// There is a little bit hack here because cancel will be called in two cases:
/// 1. the query is cancelled by the caller or meet error: in this case, wake up all waiting threads is safe
/// 2. the query is executed normally, and one of the data stream has read an empty block, the the data stream and all its children
/// will call `cancel(false)`, in this case, there is two sub-cases
/// a. the data stream read an empty block because of EOF, then it means there must be no threads waiting in Join, so cancel the join is safe
/// b. the data stream read an empty block because of early exit of some executor(like limit), in this case, just wake the waiting
/// threads is not 100% safe because if the probe thread is wake up when build is not finished yet, it may produce wrong results, for now
/// it is safe because when any of the data stream read empty block because of early exit, the execution framework ensures that no further
/// data will be used.

join->cancel();
/// Join::wakeUpAllWaitingThreads wakes up all the threads waiting in Join::waitUntilAllBuildFinished/waitUntilAllProbeFinished,
/// and once this function is called, all the subsequent call of Join::waitUntilAllBuildFinished/waitUntilAllProbeFinished will
/// skip waiting directly.
/// HashJoinProbeBlockInputStream::cancel will be called in two cases:
/// 1. the query is cancelled by the caller or meet error: in this case, wake up all waiting threads is safe, because no data
/// will be used data anymore
/// 2. the query is executed normally, and one of the data stream has read an empty block, the the data stream and all its
/// children will call `cancel(false)`, in this case, there is two sub-cases
/// a. the data stream read an empty block because of EOF, then it means there must be no threads waiting in Join, so wake
/// up all waiting threads is safe because actually there is no threads to be waken up
/// b. the data stream read an empty block because of early exit of some executor(like limit), in this case, waking up the
/// waiting threads is not 100% safe because if the probe thread is waken up when build is not finished yet, it may get
/// wrong result. Currently, the execution framework ensures that when any of the data stream read empty block because
/// of early exit, no further data will be used, and in order to make sure no wrong result is generated
/// - for threads reading joined data: will return empty block if build is not finished yet
/// - for threads reading non joined data: will return empty block if build or probe is not finished yet
join->wakeUpAllWaitingThreads();
if (non_joined_stream != nullptr)
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(non_joined_stream.get()); p_stream != nullptr)
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/DataStreams/NonJoinedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,16 @@ Block NonJoinedBlockInputStream::readImpl()
/// If build concurrency is less than non join concurrency,
/// just return empty block for extra non joined block input stream read
if (unlikely(index >= parent.getBuildConcurrency()))
return Block();
return {};
if unlikely (parent.active_build_threads != 0 || parent.active_probe_threads != 0)
{
/// build/probe is not finished yet, the query must be cancelled, so just return {}
LOG_WARNING(parent.log, "NonJoinedBlock read without non zero active_build_threads/active_probe_threads, return empty block");
return {};
}
if (!parent.has_build_data_in_memory)
/// no build data in memory, the non joined result must be empty
return Block();
return {};

/// todo read data based on JoinPartition
if (add_not_mapped_rows)
Expand Down
57 changes: 37 additions & 20 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ Join::Join(
, key_names_left(key_names_left_)
, key_names_right(key_names_right_)
, build_concurrency(0)
, active_build_concurrency(0)
, active_build_threads(0)
, probe_concurrency(0)
, active_probe_concurrency(0)
, active_probe_threads(0)
, collators(collators_)
, non_equal_conditions(non_equal_conditions_)
, original_strictness(strictness)
Expand Down Expand Up @@ -613,7 +613,7 @@ void Join::setBuildConcurrencyAndInitJoinPartition(size_t build_concurrency_)
{
if (unlikely(build_concurrency > 0))
throw Exception("Logical error: `setBuildConcurrencyAndInitJoinPartition` shouldn't be called more than once", ErrorCodes::LOGICAL_ERROR);
/// do not set active_build_concurrency because in compile stage, `joinBlock` will be called to get generate header, if active_build_concurrency
/// do not set active_build_threads because in compile stage, `joinBlock` will be called to get generate header, if active_build_threads
/// is set here, `joinBlock` will hang when used to get header
build_concurrency = std::max(1, build_concurrency_);

Expand Down Expand Up @@ -2729,12 +2729,12 @@ void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right)
void Join::finishOneBuild()
{
std::unique_lock lock(build_probe_mutex);
if (active_build_concurrency == 1)
if (active_build_threads == 1)
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
}
--active_build_concurrency;
if (active_build_concurrency == 0)
--active_build_threads;
if (active_build_threads == 0)
{
workAfterBuildFinish();
build_cv.notify_all();
Expand Down Expand Up @@ -2808,7 +2808,7 @@ void Join::waitUntilAllBuildFinished() const
{
std::unique_lock lock(build_probe_mutex);
build_cv.wait(lock, [&]() {
return active_build_concurrency == 0 || meet_error || is_canceled;
return active_build_threads == 0 || meet_error || skip_wait;
});
if (meet_error)
throw Exception(error_message);
Expand All @@ -2817,12 +2817,12 @@ void Join::waitUntilAllBuildFinished() const
void Join::finishOneProbe()
{
std::unique_lock lock(build_probe_mutex);
if (active_probe_concurrency == 1)
if (active_probe_threads == 1)
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_probe);
}
--active_probe_concurrency;
if (active_probe_concurrency == 0)
--active_probe_threads;
if (active_probe_threads == 0)
{
workAfterProbeFinish();
probe_cv.notify_all();
Expand All @@ -2833,7 +2833,7 @@ void Join::waitUntilAllProbeFinished() const
{
std::unique_lock lock(build_probe_mutex);
probe_cv.wait(lock, [&]() {
return active_probe_concurrency == 0 || meet_error || is_canceled;
return active_probe_threads == 0 || meet_error || skip_wait;
});
if (meet_error)
throw Exception(error_message);
Expand All @@ -2842,21 +2842,38 @@ void Join::waitUntilAllProbeFinished() const

void Join::finishOneNonJoin(size_t partition_index)
{
while (partition_index < build_concurrency)
if likely (active_build_threads == 0 && active_probe_threads == 0)
{
std::unique_lock partition_lock = partitions[partition_index]->lockPartition();
partitions[partition_index]->releaseBuildPartitionBlocks(partition_lock);
partitions[partition_index]->releaseProbePartitionBlocks(partition_lock);
if (!partitions[partition_index]->isSpill())
/// only clear hash table if not active build/probe threads
while (partition_index < build_concurrency)
{
releaseBuildPartitionHashTable(partition_index, partition_lock);
std::unique_lock partition_lock = partitions[partition_index]->lockPartition();
partitions[partition_index]->releaseBuildPartitionBlocks(partition_lock);
partitions[partition_index]->releaseProbePartitionBlocks(partition_lock);
if (!partitions[partition_index]->isSpill())
{
releaseBuildPartitionHashTable(partition_index, partition_lock);
}
partition_index += build_concurrency;
}
partition_index += build_concurrency;
}
}

Block Join::joinBlock(ProbeProcessInfo & probe_process_info) const
Block Join::joinBlock(ProbeProcessInfo & probe_process_info, bool dry_run) const
{
if unlikely (dry_run)
{
assert(probe_process_info.block.rows() == 0);
}
else
{
if unlikely (active_build_threads != 0)
{
/// build is not finished yet, the query must be cancelled, so just return {}
LOG_WARNING(log, "JoinBlock without non zero active_build_threads, return empty block");
return {};
}
}
std::shared_lock lock(rwlock);

probe_process_info.updateStartRow();
Expand Down Expand Up @@ -3170,7 +3187,7 @@ std::optional<RestoreInfo> Join::getOneRestoreStream(size_t max_block_size_)
auto new_max_bytes_before_external_join = static_cast<size_t>(max_bytes_before_external_join * (static_cast<double>(restore_join_build_concurrency) / build_concurrency));
restore_join = createRestoreJoin(std::max(1, new_max_bytes_before_external_join));
restore_join->initBuild(build_sample_block, restore_join_build_concurrency);
restore_join->setInitActiveBuildConcurrency();
restore_join->setInitActiveBuildThreads();
restore_join->initProbe(probe_sample_block, restore_join_build_concurrency);
for (Int64 i = 0; i < restore_join_build_concurrency; i++)
{
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class Join
/** Join data from the map (that was previously built by calls to insertFromBlock) to the block with data from "left" table.
* Could be called from different threads in parallel.
*/
Block joinBlock(ProbeProcessInfo & probe_process_info) const;
Block joinBlock(ProbeProcessInfo & probe_process_info, bool dry_run = false) const;

void checkTypes(const Block & block) const;

Expand Down Expand Up @@ -234,10 +234,10 @@ class Join

const Names & getLeftJoinKeys() const { return key_names_left; }

void setInitActiveBuildConcurrency()
void setInitActiveBuildThreads()
{
std::unique_lock lock(build_probe_mutex);
active_build_concurrency = getBuildConcurrency();
active_build_threads = getBuildConcurrency();
}

size_t getProbeConcurrency() const
Expand All @@ -249,13 +249,13 @@ class Join
{
std::unique_lock lock(build_probe_mutex);
probe_concurrency = concurrency;
active_probe_concurrency = probe_concurrency;
active_probe_threads = probe_concurrency;
}

void cancel()
void wakeUpAllWaitingThreads()
{
std::unique_lock lk(build_probe_mutex);
is_canceled = true;
skip_wait = true;
probe_cv.notify_all();
build_cv.notify_all();
}
Expand Down Expand Up @@ -423,13 +423,13 @@ class Join

mutable std::condition_variable build_cv;
size_t build_concurrency;
size_t active_build_concurrency;
std::atomic<size_t> active_build_threads;

mutable std::condition_variable probe_cv;
size_t probe_concurrency;
size_t active_probe_concurrency;
std::atomic<size_t> active_probe_threads;

bool is_canceled = false;
bool skip_wait = false;
bool meet_error = false;
String error_message;

Expand Down
Loading

0 comments on commit cf5deb8

Please sign in to comment.