Skip to content

Commit

Permalink
Use try lock first for insertBlockIntoMapsTypeCase (#7369)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Jun 28, 2023
1 parent 787ab97 commit 874c2c4
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 41 deletions.
165 changes: 124 additions & 41 deletions dbms/src/Interpreters/JoinPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ std::unique_lock<std::mutex> JoinPartition::lockPartition()
{
return std::unique_lock(partition_mutex);
}
std::unique_lock<std::mutex> JoinPartition::tryLockPartition()
{
return std::unique_lock(partition_mutex, std::try_to_lock);
}
void JoinPartition::releaseBuildPartitionBlocks(std::unique_lock<std::mutex> &)
{
auto released_bytes = build_partition.bytes;
Expand Down Expand Up @@ -436,17 +440,17 @@ struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter>
};

/// insert Block into one map, don't need acquire lock inside this function
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map, bool need_record_not_insert_rows>
void NO_INLINE insertBlockIntoMapTypeCase(
JoinPartition & join_partition,
size_t rows,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
const TiDB::TiDBCollators & collators,
Block * stored_block,
ConstNullMapPtr null_map)
ConstNullMapPtr null_map,
RowsNotInsertToMap * rows_not_inserted_to_map)
{
auto * rows_not_inserted_to_map = join_partition.getRowsNotInsertedToMap();
auto & pool = *join_partition.getPartitionPool();

Map & map = join_partition.template getHashMap<Map>();
Expand All @@ -457,14 +461,17 @@ void NO_INLINE insertBlockIntoMapTypeCase(
bool null_need_materialize = isNullAwareSemiFamily(join_partition.getJoinKind());
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
if constexpr (has_null_map)
{
if (rows_not_inserted_to_map)
if ((*null_map)[i])
{
/// for right/full out join or null-aware semi join, need to insert into rows_not_inserted_to_map
rows_not_inserted_to_map->insertRow(stored_block, i, null_need_materialize, pool);
if constexpr (need_record_not_insert_rows)
{
/// for right/full out join or null-aware semi join, need to insert into rows_not_inserted_to_map
rows_not_inserted_to_map->insertRow(stored_block, i, null_need_materialize, pool);
}
continue;
}
continue;
}

Inserter<STRICTNESS, Map, KeyGetter>::insert(
Expand All @@ -478,7 +485,7 @@ void NO_INLINE insertBlockIntoMapTypeCase(
}

/// insert Block into maps, for each map, need to acquire lock before insert
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map, bool need_record_not_insert_rows>
void NO_INLINE insertBlockIntoMapsTypeCase(
JoinPartitions & join_partitions,
size_t rows,
Expand All @@ -487,10 +494,10 @@ void NO_INLINE insertBlockIntoMapsTypeCase(
const TiDB::TiDBCollators & collators,
Block * stored_block,
ConstNullMapPtr null_map,
size_t stream_index)
size_t stream_index,
RowsNotInsertToMap * rows_not_inserted_to_map)
{
auto & current_join_partition = join_partitions[stream_index];
auto * rows_not_inserted_to_map = current_join_partition->getRowsNotInsertedToMap();
auto & pool = *current_join_partition->getPartitionPool();

/// use this map to calculate key hash
Expand All @@ -506,7 +513,7 @@ void NO_INLINE insertBlockIntoMapsTypeCase(
/// 2. hash value is calculated twice, maybe we can refine the code to cache the hash value
/// 3. extra memory to store the segment index info
std::vector<std::vector<size_t>> segment_index_info;
if (has_null_map && rows_not_inserted_to_map)
if constexpr (has_null_map && need_record_not_insert_rows)
{
segment_index_info.resize(segment_size + 1);
}
Expand All @@ -521,11 +528,14 @@ void NO_INLINE insertBlockIntoMapsTypeCase(
}
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
if constexpr (has_null_map)
{
if (rows_not_inserted_to_map)
segment_index_info[segment_index_info.size() - 1].push_back(i);
continue;
if ((*null_map)[i])
{
if constexpr (need_record_not_insert_rows)
segment_index_info.back().push_back(i);
continue;
}
}
auto key_holder = key_getter.getKeyHolder(i, &pool, sort_key_containers);
SCOPE_EXIT(keyHolderDiscardKey(key_holder));
Expand All @@ -540,32 +550,76 @@ void NO_INLINE insertBlockIntoMapsTypeCase(
segment_index_info[segment_index].push_back(i);
}

bool null_need_materialize = isNullAwareSemiFamily(current_join_partition->getJoinKind());
for (size_t insert_index = 0; insert_index < segment_index_info.size(); insert_index++)
std::list<size_t> insert_indexes;
for (size_t i = 0; i < segment_index_info.size(); ++i)
{
size_t insert_index = (i + stream_index) % segment_index_info.size();
insert_indexes.emplace_back(insert_index);
}

#define INSERT_TO_MAP(join_partition, segment_index) \
auto & current_map = (join_partition)->getHashMap<Map>(); \
for (auto & i : (segment_index)) \
{ \
Inserter<STRICTNESS, Map, KeyGetter>::insert(current_map, key_getter, stored_block, i, pool, sort_key_containers); \
}

#define INSERT_TO_NOT_INSERTED_MAP \
/* null value */ \
/* here ignore mutex because rows_not_inserted_to_map is privately owned by each stream thread */ \
/* for right/full out join or null-aware semi join, need to insert into rows_not_inserted_to_map */ \
assert(rows_not_inserted_to_map != nullptr); \
assert(segment_index_info.size() == (1 + segment_size)); \
bool null_need_materialize = isNullAwareSemiFamily(current_join_partition->getJoinKind()); \
for (auto index : segment_index_info[segment_size]) \
{ \
rows_not_inserted_to_map->insertRow(stored_block, index, null_need_materialize, pool); \
}

// First use tryLock to traverse twice to find all segments that can acquire locks immediately and execute insert.
//
// If there is only one segment left, there is no need to use try locks
// since it only causes unnecessary CPU consumption, and a blocking lock can be used directly.
for (size_t i = 0; i <= 1 && insert_indexes.size() > 1; ++i)
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_build_failpoint);
size_t segment_index = (insert_index + stream_index) % segment_index_info.size();
if (segment_index == segment_size)
for (auto it = insert_indexes.begin(); it != insert_indexes.end();)
{
/// null value
/// here ignore mutex because rows_not_inserted_to_map is privately owned by each stream thread
for (auto index : segment_index_info[segment_index])
auto segment_index = *it;
if (segment_index == segment_size)
{
/// for right/full out join or null-aware semi join, need to insert into rows_not_inserted_to_map
RUNTIME_ASSERT(rows_not_inserted_to_map != nullptr);
rows_not_inserted_to_map->insertRow(stored_block, index, null_need_materialize, pool);
INSERT_TO_NOT_INSERTED_MAP
it = insert_indexes.erase(it);
}
}
else
{
auto lock = join_partitions[segment_index]->lockPartition();
auto & current_map = join_partitions[segment_index]->getHashMap<Map>();
for (auto & i : segment_index_info[segment_index])
else
{
Inserter<STRICTNESS, Map, KeyGetter>::insert(current_map, key_getter, stored_block, i, pool, sort_key_containers);
auto & join_partition = join_partitions[segment_index];
if (auto try_lock = join_partition->tryLockPartition(); try_lock)
{
INSERT_TO_MAP(join_partition, segment_index_info[segment_index]);
it = insert_indexes.erase(it);
}
else
{
++it;
}
}
}
}

// Next use blocking locks to insert the remaining segments to avoid unnecessary cpu consumption.
for (auto segment_index : insert_indexes)
{
// When segment_index is segment_size, it must be processed in first step.
RUNTIME_CHECK_MSG(segment_index < segment_size, "Internal Error: When segment_index is segment_size, it must be processed in first step.");
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_build_failpoint);
auto & join_partition = join_partitions[segment_index];
auto lock = join_partition->lockPartition();
INSERT_TO_MAP(join_partition, segment_index_info[segment_index]);
}

#undef INSERT_TO_MAP
#undef INSERT_TO_NOT_INSERTED_MAP
}

template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
Expand All @@ -583,39 +637,68 @@ void insertBlockIntoMapsImplType(
bool enable_join_spill)
{
auto & current_join_partition = join_partitions[stream_index];
auto * rows_not_inserted_to_map = current_join_partition->getRowsNotInsertedToMap();
if (enable_join_spill)
{
/// case 1, join with spill support, the partition level lock is acquired in `Join::insertFromBlock`
if (null_map)
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map);
{
if (rows_not_inserted_to_map)
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true, true>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map);
else
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr);
}
else
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map);
{
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, false, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr);
}
return;
}
else if (enable_fine_grained_shuffle)
{
/// case 2, join with fine_grained_shuffle, no need to acquire any lock
if (null_map)
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map);
{
if (rows_not_inserted_to_map)
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true, true>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map);
else
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr);
}
else
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map);
{
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, false, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr);
}
}
else if (insert_concurrency > 1)
{
/// case 3, normal join with concurrency > 1, will acquire lock in `insertBlockIntoMapsTypeCase`
if (null_map)
insertBlockIntoMapsTypeCase<STRICTNESS, KeyGetter, Map, true>(join_partitions, rows, key_columns, key_sizes, collators, stored_block, null_map, stream_index);
{
if (rows_not_inserted_to_map)
insertBlockIntoMapsTypeCase<STRICTNESS, KeyGetter, Map, true, true>(join_partitions, rows, key_columns, key_sizes, collators, stored_block, null_map, stream_index, rows_not_inserted_to_map);
else
insertBlockIntoMapsTypeCase<STRICTNESS, KeyGetter, Map, true, false>(join_partitions, rows, key_columns, key_sizes, collators, stored_block, null_map, stream_index, nullptr);
}
else
insertBlockIntoMapsTypeCase<STRICTNESS, KeyGetter, Map, false>(join_partitions, rows, key_columns, key_sizes, collators, stored_block, null_map, stream_index);
{
insertBlockIntoMapsTypeCase<STRICTNESS, KeyGetter, Map, false, false>(join_partitions, rows, key_columns, key_sizes, collators, stored_block, null_map, stream_index, nullptr);
}
}
else
{
/// case 4, normal join with concurrency == 1, no need to acquire any lock
RUNTIME_CHECK(stream_index == 0);
if (null_map)
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map);
{
if (rows_not_inserted_to_map)
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true, true>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map);
else
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, true, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr);
}
else
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map);
{
insertBlockIntoMapTypeCase<STRICTNESS, KeyGetter, Map, false, false>(*current_join_partition, rows, key_columns, key_sizes, collators, stored_block, null_map, nullptr);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/JoinPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class JoinPartition
return trySpillBuildPartition(force, max_cached_data_bytes, lock);
}
std::unique_lock<std::mutex> lockPartition();
std::unique_lock<std::mutex> tryLockPartition();
/// use lock as the argument to force the caller acquire the lock before call them
void releaseBuildPartitionBlocks(std::unique_lock<std::mutex> &);
void releaseProbePartitionBlocks(std::unique_lock<std::mutex> &);
Expand Down

0 comments on commit 874c2c4

Please sign in to comment.