Skip to content

Commit

Permalink
put a global lock on insertFromBlock
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxin9014 committed Mar 6, 2023
1 parent e2e254e commit 29e2261
Showing 1 changed file with 71 additions and 58 deletions.
129 changes: 71 additions & 58 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,6 @@ void Join::setSampleBlock(const Block & block)

std::shared_ptr<Join> Join::createRestoreJoin()
{
RUNTIME_CHECK_MSG(restore_round < 5, "spill to disk repartition times should be smaller than 5.");
return std::make_shared<Join>(
key_names_left,
key_names_right,
Expand Down Expand Up @@ -862,8 +861,7 @@ void insertFromBlockImplType(
size_t insert_concurrency,
Arena & pool,
bool enable_fine_grained_shuffle,
bool enable_join_spill,
std::mutex & partition_lock)
bool enable_join_spill)
{
if (null_map)
{
Expand All @@ -873,7 +871,6 @@ void insertFromBlockImplType(
}
else if (enable_join_spill)
{
std::lock_guard lk(partition_lock);
if (map.isSegmentRelease(stream_index))
{
return;
Expand All @@ -895,7 +892,6 @@ void insertFromBlockImplType(
}
else if (enable_join_spill)
{
std::lock_guard lk(map.getSegmentMutex(stream_index));
if (map.isSegmentRelease(stream_index))
{
return;
Expand Down Expand Up @@ -926,8 +922,7 @@ void insertFromBlockImpl(
size_t insert_concurrency,
Arena & pool,
bool enable_fine_grained_shuffle,
bool enable_join_spill,
std::mutex & partition_lock)
bool enable_join_spill)
{
switch (type)
{
Expand All @@ -951,8 +946,7 @@ void insertFromBlockImpl(
insert_concurrency, \
pool, \
enable_fine_grained_shuffle, \
enable_join_spill, \
partition_lock); \
enable_join_spill); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
Expand Down Expand Up @@ -1064,11 +1058,14 @@ void Join::insertFromBlock(const Block & block, size_t stream_index)
blocks_to_spill = trySpillBuildPartition(i, false);
else
stored_block = &(partitions[i].build_partition.blocks.back());
if (stored_block != nullptr)
{
insertFromBlockInternal(stored_block, i);
continue;
}
}
if (stored_block != nullptr)
insertFromBlockInternal(stored_block, i);
else
build_spiller->spillBlocks(blocks_to_spill, i);

build_spiller->spillBlocks(blocks_to_spill, i);
}
}
}
Expand Down Expand Up @@ -1165,16 +1162,16 @@ void Join::insertFromBlockInternal(Block * stored_block, size_t stream_index)
if (!getFullness(kind))
{
if (strictness == ASTTableJoin::Strictness::Any)
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(type, maps_any, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr, stream_index, getBuildConcurrencyInternal(), *pools[stream_index], enable_fine_grained_shuffle, enable_join_spill, partitions_locks[stream_index]);
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(type, maps_any, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr, stream_index, getBuildConcurrencyInternal(), *pools[stream_index], enable_fine_grained_shuffle, enable_join_spill);
else
insertFromBlockImpl<ASTTableJoin::Strictness::All>(type, maps_all, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr, stream_index, getBuildConcurrencyInternal(), *pools[stream_index], enable_fine_grained_shuffle, enable_join_spill, partitions_locks[stream_index]);
insertFromBlockImpl<ASTTableJoin::Strictness::All>(type, maps_all, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr, stream_index, getBuildConcurrencyInternal(), *pools[stream_index], enable_fine_grained_shuffle, enable_join_spill);
}
else
{
if (strictness == ASTTableJoin::Strictness::Any)
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(type, maps_any_full, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map[stream_index].get(), stream_index, getBuildConcurrencyInternal(), *pools[stream_index], enable_fine_grained_shuffle, enable_join_spill, partitions_locks[stream_index]);
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(type, maps_any_full, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map[stream_index].get(), stream_index, getBuildConcurrencyInternal(), *pools[stream_index], enable_fine_grained_shuffle, enable_join_spill);
else
insertFromBlockImpl<ASTTableJoin::Strictness::All>(type, maps_all_full, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map[stream_index].get(), stream_index, getBuildConcurrencyInternal(), *pools[stream_index], enable_fine_grained_shuffle, enable_join_spill, partitions_locks[stream_index]);
insertFromBlockImpl<ASTTableJoin::Strictness::All>(type, maps_all_full, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map[stream_index].get(), stream_index, getBuildConcurrencyInternal(), *pools[stream_index], enable_fine_grained_shuffle, enable_join_spill);
}
}
}
Expand Down Expand Up @@ -2517,6 +2514,7 @@ void Join::spillMostMemoryUsedPartitionIfNeed()

{
std::unique_lock lk(partitions_lock);
RUNTIME_CHECK_MSG(restore_round < 4, "max_bytes_before_external_join is too small, please make it larger and try again.");
if (max_bytes_before_external_join && getTotalByteCount() <= max_bytes_before_external_join)
{
return;
Expand Down Expand Up @@ -2611,58 +2609,73 @@ bool Join::hasPartitionSpilled()
std::tuple<JoinPtr, BlockInputStreamPtr, BlockInputStreamPtr, BlockInputStreamPtr> Join::getOneRestoreStream(size_t max_block_size)
{
std::unique_lock lk(partitions_lock);
assert(restore_build_streams.size() == restore_probe_streams.size() && restore_build_streams.size() == restore_non_joined_data_streams.size());
auto get_back_stream = [](BlockInputStreams & streams) {
BlockInputStreamPtr stream = streams.back();
streams.pop_back();
return stream;
};
if (!restore_build_streams.empty())
{
std::lock_guard err_lk(build_probe_mutex);
if (meet_error)
return {nullptr, nullptr, nullptr, nullptr};
}
LOG_DEBUG(log, fmt::format("restore_build_streams {}, restore_probe_streams {}, restore_non_joined_data_streams {}", restore_build_streams.size(), restore_build_streams.size(), restore_non_joined_data_streams.size()));
assert(restore_build_streams.size() == restore_probe_streams.size() && restore_build_streams.size() == restore_non_joined_data_streams.size());
try
{
auto get_back_stream = [](BlockInputStreams & streams) {
BlockInputStreamPtr stream = streams.back();
streams.pop_back();
return stream;
};
if (!restore_build_streams.empty())
{
auto build_stream = get_back_stream(restore_build_streams);
auto probe_stream = get_back_stream(restore_probe_streams);
auto non_joined_data_stream = get_back_stream(restore_non_joined_data_streams);
if (restore_build_streams.empty())
{
spilled_partition_indexes.pop_front();
}
return {restore_join, build_stream, probe_stream, non_joined_data_stream};
}
if (spilled_partition_indexes.empty())
{
return {nullptr, nullptr, nullptr, nullptr};
}
auto spilled_partition_index = spilled_partition_indexes.front();
RUNTIME_CHECK_MSG(partitions[spilled_partition_index].spill, "should not restore unspilled partition.");
if (restore_join_build_concurrency <= 0)
restore_join_build_concurrency = getRestoreJoinBuildConcurrency(partitions.size(), spilled_partition_indexes.size(), join_restore_concurrency, probe_concurrency);
assert(restore_join_build_concurrency >= 1);
LOG_DEBUG(log, "partition {}, round {}, build concurrency {}", spilled_partition_index, restore_round, restore_join_build_concurrency);
restore_build_streams = build_spiller->restoreBlocks(spilled_partition_index, restore_join_build_concurrency, true);
restore_probe_streams = probe_spiller->restoreBlocks(spilled_partition_index, restore_join_build_concurrency, true);
restore_non_joined_data_streams.resize(restore_join_build_concurrency, nullptr);
RUNTIME_CHECK_MSG(restore_build_streams.size() == static_cast<size_t>(restore_join_build_concurrency), "restore streams size must equal to restore_join_build_concurrency");
restore_join = createRestoreJoin();
restore_join->initBuild(restore_build_streams[0]->getHeader(), restore_join_build_concurrency);
restore_join->setInitActiveBuildConcurrency();
restore_join->initProbe(restore_probe_streams[0]->getHeader(), restore_join_build_concurrency);
for (Int64 i = 0; i < restore_join_build_concurrency; i++)
{
restore_build_streams[i] = std::make_shared<HashJoinBuildBlockInputStream>(restore_build_streams[i], restore_join, i, log->identifier());
}
auto build_stream = get_back_stream(restore_build_streams);
auto probe_stream = get_back_stream(restore_probe_streams);
auto non_joined_data_stream = get_back_stream(restore_non_joined_data_streams);
if (restore_build_streams.empty())
{
spilled_partition_indexes.pop_front();
}
if (needReturnNonJoinedData())
{
for (Int64 i = 0; i < restore_join_build_concurrency; i++)
restore_non_joined_data_streams[i] = restore_join->createStreamWithNonJoinedRows(probe_stream->getHeader(), i, restore_join_build_concurrency, max_block_size);
}
auto non_joined_data_stream = get_back_stream(restore_non_joined_data_streams);
return {restore_join, build_stream, probe_stream, non_joined_data_stream};
}
if (spilled_partition_indexes.empty())
catch (...)
{
return {nullptr, nullptr, nullptr, nullptr};
}
auto spilled_partition_index = spilled_partition_indexes.front();
RUNTIME_CHECK_MSG(partitions[spilled_partition_index].spill, "should not restore unspilled partition.");
if (restore_join_build_concurrency <= 0)
restore_join_build_concurrency = getRestoreJoinBuildConcurrency(partitions.size(), spilled_partition_indexes.size(), join_restore_concurrency, probe_concurrency);
assert(restore_join_build_concurrency >= 1);
LOG_DEBUG(log, "partition {}, round {}, build concurrency {}", spilled_partition_index, restore_round, restore_join_build_concurrency);
restore_build_streams = build_spiller->restoreBlocks(spilled_partition_index, restore_join_build_concurrency, true);
restore_probe_streams = probe_spiller->restoreBlocks(spilled_partition_index, restore_join_build_concurrency, true);
RUNTIME_CHECK_MSG(restore_build_streams.size() == static_cast<size_t>(restore_join_build_concurrency), "restore streams size must equal to restore_join_build_concurrency");
restore_join = createRestoreJoin();
restore_join->initBuild(restore_build_streams[0]->getHeader(), restore_join_build_concurrency);
restore_join->setInitActiveBuildConcurrency();
restore_join->initProbe(restore_probe_streams[0]->getHeader(), restore_join_build_concurrency);
for (Int64 i = 0; i < restore_join_build_concurrency; i++)
{
restore_build_streams[i] = std::make_shared<HashJoinBuildBlockInputStream>(restore_build_streams[i], restore_join, i, log->identifier());
}
auto build_stream = get_back_stream(restore_build_streams);
auto probe_stream = get_back_stream(restore_probe_streams);
if (restore_build_streams.empty())
{
spilled_partition_indexes.pop_front();
}
restore_non_joined_data_streams.resize(restore_join_build_concurrency, nullptr);
if (needReturnNonJoinedData())
{
for (Int64 i = 0; i < restore_join_build_concurrency; i++)
restore_non_joined_data_streams[i] = restore_join->createStreamWithNonJoinedRows(probe_stream->getHeader(), i, restore_join_build_concurrency, max_block_size);
auto err_message = getCurrentExceptionMessage(false, true);
meetError(err_message);
throw Exception(err_message);
}
auto non_joined_data_stream = get_back_stream(restore_non_joined_data_streams);
return {restore_join, build_stream, probe_stream, non_joined_data_stream};
}

void Join::dispatchProbeBlock(Block & block, std::list<std::tuple<size_t, Block>> & partition_blocks_list)
Expand Down

0 comments on commit 29e2261

Please sign in to comment.