From cd4f7727d1c0137548de0c895df9d8cd4637e56e Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 7 Mar 2023 14:28:10 +0800 Subject: [PATCH] spill support fine grained shuffle (#8) Signed-off-by: xufei --- .../HashJoinProbeBlockInputStream.cpp | 4 +- dbms/src/Flash/Statistics/JoinImpl.cpp | 10 +- dbms/src/Flash/Statistics/JoinImpl.h | 4 +- dbms/src/Flash/tests/gtest_join_executor.cpp | 41 +-- dbms/src/Interpreters/Join.cpp | 240 ++++++++++-------- dbms/src/Interpreters/Join.h | 8 +- 6 files changed, 181 insertions(+), 126 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index e83fe38cbd5..ff97460434f 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -135,7 +135,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() size_t partition_index = 0; Block block; - if (!join->isEnableSpill()) + if (!join->isSpilled()) { block = current_probe_stream->read(); } @@ -148,7 +148,7 @@ Block HashJoinProbeBlockInputStream::getOutputBlock() if (!block) { - if (join->isEnableSpill()) + if (join->isSpilled()) { block = current_probe_stream->read(); if (block) diff --git a/dbms/src/Flash/Statistics/JoinImpl.cpp b/dbms/src/Flash/Statistics/JoinImpl.cpp index 9faf3da16eb..d55ef1aa262 100644 --- a/dbms/src/Flash/Statistics/JoinImpl.cpp +++ b/dbms/src/Flash/Statistics/JoinImpl.cpp @@ -20,11 +20,13 @@ namespace DB void JoinStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const { fmt_buffer.fmtAppend( - R"("hash_table_bytes":{},"build_side_child":"{}",)" + R"("peak_build_bytes_usage":{},"build_side_child":"{}","is_spill_enabled":{},"is_spilled":{})" R"("non_joined_outbound_rows":{},"non_joined_outbound_blocks":{},"non_joined_outbound_bytes":{},"non_joined_execution_time_ns":{},)" R"("join_build_inbound_rows":{},"join_build_inbound_blocks":{},"join_build_inbound_bytes":{},"join_build_execution_time_ns":{})", - hash_table_bytes, + peak_build_bytes_usage, build_side_child, + is_spill_enabled, + is_spilled, non_joined_base.rows, non_joined_base.blocks, non_joined_base.bytes, @@ -42,8 +44,10 @@ void JoinStatistics::collectExtraRuntimeDetail() if (it != join_execute_info_map.end()) { const auto & join_execute_info = it->second; - hash_table_bytes = join_execute_info.join_ptr->getTotalByteCount(); + peak_build_bytes_usage = join_execute_info.join_ptr->getPeakBuildBytesUsage(); build_side_child = join_execute_info.build_side_root_executor_id; + is_spill_enabled = join_execute_info.join_ptr->isEnableSpill(); + is_spilled = join_execute_info.join_ptr->isSpilled(); for (const auto & non_joined_stream : join_execute_info.non_joined_streams) { if (auto * p_stream = dynamic_cast(non_joined_stream.get()); p_stream) diff --git a/dbms/src/Flash/Statistics/JoinImpl.h b/dbms/src/Flash/Statistics/JoinImpl.h index 3fedb77f2ff..b6a74fa0b5d 100644 --- a/dbms/src/Flash/Statistics/JoinImpl.h +++ b/dbms/src/Flash/Statistics/JoinImpl.h @@ -39,8 +39,10 @@ class JoinStatistics : public JoinStatisticsBase JoinStatistics(const tipb::Executor * executor, DAGContext & dag_context_); private: - size_t hash_table_bytes = 0; + size_t peak_build_bytes_usage = 0; String build_side_child; + bool is_spill_enabled = false; + bool is_spilled = false; BaseRuntimeStatistics non_joined_base; diff --git a/dbms/src/Flash/tests/gtest_join_executor.cpp b/dbms/src/Flash/tests/gtest_join_executor.cpp index 30debc69b65..29cf1e1f935 100644 --- a/dbms/src/Flash/tests/gtest_join_executor.cpp +++ b/dbms/src/Flash/tests/gtest_join_executor.cpp @@ -771,7 +771,8 @@ try size_t table_rows = 51200; size_t common_rows = 10240; UInt64 max_block_size = 800; - size_t original_max_streams = 20; + size_t max_streams_big = 20; + size_t max_streams_small = 4; for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(left_column_infos)) { ColumnGeneratorOpts opts{common_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name}; @@ -832,7 +833,7 @@ try .build(context); context.context.setSetting("max_block_size", Field(static_cast(max_block_size))); /// use right_table left join left_table as the reference - auto ref_columns = executeStreams(request, original_max_streams); + auto ref_columns = executeStreams(request, max_streams_big); /// case 1.1 table scan join table scan for (auto & left_table_name : left_table_names) @@ -844,17 +845,18 @@ try .join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")}) .build(context); context.context.setSetting("max_bytes_before_external_join", Field(static_cast(0))); - auto result_columns = executeStreams(request, original_max_streams); - ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small)); // test spill to disk context.context.setSetting("max_bytes_before_external_join", Field(static_cast(max_bytes_before_external_join))); if (right_table_name == "right_table_1_concurrency") { - ASSERT_THROW(executeStreams(request, original_max_streams), Exception); + ASSERT_THROW(executeStreams(request, max_streams_big), Exception); } else { - ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small)); } } } @@ -868,18 +870,19 @@ try .join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {}, {}, {}, exchange_concurrency) .build(context); context.context.setSetting("max_bytes_before_external_join", Field(static_cast(0))); - auto result_columns = executeStreams(request, original_max_streams); - ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small)); // test spill to disk context.context.setSetting("max_bytes_before_external_join", Field(static_cast(max_bytes_before_external_join))); if (exchange_concurrency == 1) { - ASSERT_THROW(executeStreams(request, original_max_streams), Exception); + ASSERT_THROW(executeStreams(request, max_streams_big), Exception); } else { - ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small)); } } } @@ -892,7 +895,7 @@ try context.context.setSetting("max_block_size", Field(static_cast(max_block_size))); /// use right_table left join left_table as the reference context.context.setSetting("max_bytes_before_external_join", Field(static_cast(0))); - ref_columns = executeStreams(request, original_max_streams); + ref_columns = executeStreams(request, max_streams_big); /// case 2.1 table scan join table scan for (auto & left_table_name : left_table_names) { @@ -903,18 +906,18 @@ try .join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_name + ".b"), lit(Field(static_cast(1000))))}, {}, {}, 0) .build(context); context.context.setSetting("max_bytes_before_external_join", Field(static_cast(0))); - auto result_columns = executeStreams(request, original_max_streams); - ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big)); // test spill to disk context.context.setSetting("max_bytes_before_external_join", Field(static_cast(max_bytes_before_external_join))); if (right_table_name == "right_table_1_concurrency") { - ASSERT_THROW(executeStreams(request, original_max_streams), Exception); + ASSERT_THROW(executeStreams(request, max_streams_big), Exception); } else { - ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small)); } } } @@ -929,18 +932,18 @@ try .join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(exchange_name + ".b"), lit(Field(static_cast(1000))))}, {}, {}, exchange_concurrency) .build(context); context.context.setSetting("max_bytes_before_external_join", Field(static_cast(0))); - auto result_columns = executeStreams(request, original_max_streams); - ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big)); // test spill to disk context.context.setSetting("max_bytes_before_external_join", Field(static_cast(max_bytes_before_external_join))); if (exchange_concurrency == 1) { - ASSERT_THROW(executeStreams(request, original_max_streams), Exception); + ASSERT_THROW(executeStreams(request, max_streams_big), Exception); } else { - ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_big)); + ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, max_streams_small)); } } } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 3153366a05b..7c12907a387 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -131,6 +131,63 @@ size_t getRestoreJoinBuildConcurrency(size_t total_partitions, size_t spilled_pa return std::max(2, restore_build_concurrency); } } +UInt64 inline updateHashValue(size_t restore_round, UInt64 x) +{ + static std::vector hash_constants{0xff51afd7ed558ccdULL, 0xc4ceb9fe1a85ec53ULL, 0xde43a68e4d184aa3ULL, 0x86f1fda459fa47c7ULL, 0xd91419add64f471fULL, 0xc18eea9cbe12489eULL, 0x2cb94f36b9fe4c38ULL, 0xef0f50cc5f0c4cbaULL}; + static size_t hash_constants_size = hash_constants.size(); + assert(hash_constants_size > 0 && (hash_constants_size & (hash_constants_size - 1)) == 0); + assert(restore_round != 0); + x ^= x >> 33; + x *= hash_constants[restore_round & (hash_constants_size - 1)]; + x ^= x >> 33; + x *= hash_constants[(restore_round + 1) & (hash_constants_size - 1)]; + x ^= x >> 33; + return x; +} +struct JoinBuildInfo +{ + bool enable_fine_grained_shuffle; + size_t fine_grained_shuffle_count; + bool enable_spill; + bool is_spilled; + size_t build_concurrency; + size_t restore_round; + bool needVirtualDispatchForProbeBlock() const + { + return enable_fine_grained_shuffle || (enable_spill && !is_spilled); + } +}; +ColumnRawPtrs extractAndMaterializeKeyColumns(const Block & block, Columns & materialized_columns, const Strings & key_columns_names) +{ + ColumnRawPtrs key_columns(key_columns_names.size()); + for (size_t i = 0; i < key_columns_names.size(); ++i) + { + key_columns[i] = block.getByName(key_columns_names[i]).column.get(); + + if (ColumnPtr converted = key_columns[i]->convertToFullColumnIfConst()) + { + materialized_columns.emplace_back(converted); + key_columns[i] = materialized_columns.back().get(); + } + } + return key_columns; +} + +void computeDispatchHash(size_t rows, + const ColumnRawPtrs & key_columns, + const TiDB::TiDBCollators & collators, + std::vector & partition_key_containers, + size_t join_restore_round, + WeakHash32 & hash) +{ + HashBaseWriterHelper::computeHash(rows, key_columns, collators, partition_key_containers, hash); + if (join_restore_round != 0) + { + auto & data = hash.getData(); + for (size_t i = 0; i < rows; ++i) + data[i] = updateHashValue(join_restore_round, data[i]); + } +} } // namespace const std::string Join::match_helper_prefix = "__left-semi-join-match-helper"; @@ -545,40 +602,50 @@ size_t Join::getPartitionByteCount(size_t partition_index) const return ret; } -size_t Join::getTotalByteCount() const +size_t Join::getTotalByteCount() { size_t res = 0; if (isEnableSpill()) { for (const auto & join_partition : partitions) res += join_partition->memory_usage; - return res; - } - - if (type == Type::CROSS) - { - for (const auto & block : blocks) - res += block.bytes(); } else { - for (const auto & block : original_blocks) - res += block.bytes(); - res += getTotalByteCountImpl(maps_any, type); - res += getTotalByteCountImpl(maps_all, type); - res += getTotalByteCountImpl(maps_any_full, type); - res += getTotalByteCountImpl(maps_all_full, type); - - for (const auto & partition : partitions) + if (type == Type::CROSS) { - /// note the return value might not be accurate since it does not use lock, but should be enough for current usage - res += partition->pool->size(); + for (const auto & block : blocks) + res += block.bytes(); + } + else + { + for (const auto & block : original_blocks) + res += block.bytes(); + res += getTotalByteCountImpl(maps_any, type); + res += getTotalByteCountImpl(maps_all, type); + res += getTotalByteCountImpl(maps_any_full, type); + res += getTotalByteCountImpl(maps_all_full, type); + + for (const auto & partition : partitions) + { + /// note the return value might not be accurate since it does not use lock, but should be enough for current usage + res += partition->pool->size(); + } } } + if (peak_build_bytes_usage) + peak_build_bytes_usage = res; return res; } +size_t Join::getPeakBuildBytesUsage() +{ + /// call `getTotalByteCount` first to make sure peak_build_bytes_usage has a meaningful value + getTotalByteCount(); + return peak_build_bytes_usage; +} + void Join::setBuildConcurrencyAndInitJoinPartition(size_t build_concurrency_) { if (unlikely(build_concurrency > 0)) @@ -1052,14 +1119,26 @@ void Join::insertFromBlock(const Block & block, size_t stream_index) else { // LOG_INFO(log, "enable spill, max_bytes_before_external_join {}, current bytes {}", max_bytes_before_external_join, getTotalByteCount()); - auto dispatch_blocks = dispatchBlock(key_names_right, block); + Blocks dispatch_blocks; + if (enable_fine_grained_shuffle) + { + dispatch_blocks.resize(build_concurrency, {}); + dispatch_blocks[stream_index] = block; + } + else + { + dispatch_blocks = dispatchBlock(key_names_right, block); + } assert(dispatch_blocks.size() == build_concurrency); + size_t bytes_to_be_added = 0; for (const auto & partition_block : dispatch_blocks) { - bytes_to_be_added += partition_block.bytes(); + if (partition_block) + { + bytes_to_be_added += partition_block.bytes(); + } } - bool force_spill_partition_blocks = false; { std::unique_lock lk(build_probe_mutex); @@ -1123,25 +1202,13 @@ bool Join::isEnableSpill() const void Join::insertFromBlockInternal(Block * stored_block, size_t stream_index) { size_t keys_size = key_names_right.size(); - ColumnRawPtrs key_columns(keys_size); const Block & block = *stored_block; /// Rare case, when keys are constant. To avoid code bloat, simply materialize them. /// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function. Columns materialized_columns; - - /// Memoize key columns to work. - for (size_t i = 0; i < keys_size; ++i) - { - key_columns[i] = block.getByName(key_names_right[i]).column.get(); - - if (ColumnPtr converted = key_columns[i]->convertToFullColumnIfConst()) - { - materialized_columns.emplace_back(converted); - key_columns[i] = materialized_columns.back().get(); - } - } + ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_right); /// We will insert to the map only keys, where all components are not NULL. ColumnPtr null_map_holder; @@ -1400,9 +1467,7 @@ void NO_INLINE joinBlockImplTypeCase( std::unique_ptr & offsets_to_replicate, const std::vector & right_indexes, const TiDB::TiDBCollators & collators, - bool enable_fine_grained_shuffle, - size_t fine_grained_shuffle_count, - bool enable_join_spill, + const JoinBuildInfo & join_build_info, ProbeProcessInfo & probe_process_info) { if (rows == 0) @@ -1419,22 +1484,19 @@ void NO_INLINE joinBlockImplTypeCase( std::vector sort_key_containers; sort_key_containers.resize(key_columns.size()); Arena pool; - WeakHash32 shuffle_hash(0); /// reproduce hash values in FinedGrainedShuffleWriter - if (enable_fine_grained_shuffle && rows > 0) + WeakHash32 build_hash(0); /// reproduce hash values according to build stage + if (join_build_info.needVirtualDispatchForProbeBlock()) { + assert(!(join_build_info.restore_round > 0 && join_build_info.enable_fine_grained_shuffle)); /// TODO: consider adding a virtual column in Sender side to avoid computing cost and potential inconsistency by heterogeneous envs(AMD64, ARM64) /// Note: 1. Not sure, if inconsistency will do happen in heterogeneous envs /// 2. Virtual column would take up a little more network bandwidth, might lead to poor performance if network was bottleneck /// Currently, the computation cost is tolerable, since it's a very simple crc32 hash algorithm, and heterogeneous envs support is not considered - HashBaseWriterHelper::computeHash(rows, - key_columns, - collators, - sort_key_containers, - shuffle_hash); + computeDispatchHash(rows, key_columns, collators, sort_key_containers, join_build_info.restore_round, build_hash); } size_t segment_size = map.getSegmentSize(); - const auto & shuffle_hash_data = shuffle_hash.getData(); + const auto & build_hash_data = build_hash.getData(); assert(probe_process_info.start_row < rows); size_t i; bool block_full = false; @@ -1463,11 +1525,11 @@ void NO_INLINE joinBlockImplTypeCase( } size_t segment_index = 0; - if (enable_join_spill) + if (join_build_info.is_spilled) { segment_index = probe_process_info.partition_index; } - else if (enable_fine_grained_shuffle) + else if (join_build_info.needVirtualDispatchForProbeBlock()) { RUNTIME_CHECK(segment_size > 0); /// Need to calculate the correct segment_index so that rows with same key will map to the same segment_index both in Build and Prob @@ -1476,11 +1538,18 @@ void NO_INLINE joinBlockImplTypeCase( /// 1. In FineGrainedShuffleWriter, selector value finally maps to packet_stream_id by '% fine_grained_shuffle_count' /// 2. In ExchangeReceiver, build_stream_id = packet_stream_id % build_stream_count; /// 3. In HashBuild, build_concurrency decides map's segment size, and build_steam_id decides the segment index - auto packet_stream_id = shuffle_hash_data[i] % fine_grained_shuffle_count; - if likely (fine_grained_shuffle_count == segment_size) - segment_index = packet_stream_id; + if (join_build_info.enable_fine_grained_shuffle) + { + auto packet_stream_id = build_hash_data[i] % join_build_info.fine_grained_shuffle_count; + if likely (join_build_info.fine_grained_shuffle_count == segment_size) + segment_index = packet_stream_id; + else + segment_index = packet_stream_id % segment_size; + } else - segment_index = packet_stream_id % segment_size; + { + segment_index = build_hash_data[i] % join_build_info.build_concurrency; + } } else { @@ -1544,9 +1613,7 @@ void joinBlockImplType( std::unique_ptr & offsets_to_replicate, const std::vector & right_indexes, const TiDB::TiDBCollators & collators, - bool enable_fine_grained_shuffle, - bool enable_join_spill, - size_t fine_grained_shuffle_count, + const JoinBuildInfo & join_build_info, ProbeProcessInfo & probe_process_info) { if (null_map) @@ -1562,9 +1629,7 @@ void joinBlockImplType( offsets_to_replicate, right_indexes, collators, - enable_fine_grained_shuffle, - fine_grained_shuffle_count, - enable_join_spill, + join_build_info, probe_process_info); else joinBlockImplTypeCase( @@ -1579,9 +1644,7 @@ void joinBlockImplType( offsets_to_replicate, right_indexes, collators, - enable_fine_grained_shuffle, - fine_grained_shuffle_count, - enable_join_spill, + join_build_info, probe_process_info); } } // namespace @@ -1812,23 +1875,11 @@ template convertToFullColumnIfConst()) - { - materialized_columns.emplace_back(converted); - key_columns[i] = materialized_columns.back().get(); - } - } + ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left); /// Keys with NULL value in any column won't join to anything. ColumnPtr null_map_holder; @@ -1908,6 +1959,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr offsets_to_replicate = std::make_unique(rows); bool enable_spill_join = isEnableSpill(); + JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round}; switch (type) { #define M(TYPE) \ @@ -1924,9 +1976,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr offsets_to_replicate, \ right_indexes, \ collators, \ - enable_fine_grained_shuffle, \ - enable_spill_join, \ - fine_grained_shuffle_count, \ + join_build_info, \ probe_process_info); \ break; APPLY_FOR_JOIN_VARIANTS(M) @@ -2472,18 +2522,6 @@ IColumn::Selector Join::hashToSelector(const WeakHash32 & hash) const size_t num_shards = build_concurrency; const auto & data = hash.getData(); size_t num_rows = data.size(); - std::vector hash_constants{0xff51afd7ed558ccdULL, 0xc4ceb9fe1a85ec53ULL, 0xde43a68e4d184aa3ULL, 0x86f1fda459fa47c7ULL, 0xd91419add64f471fULL, 0xc18eea9cbe12489eULL, 0x2cb94f36b9fe4c38ULL, 0xef0f50cc5f0c4cbaULL}; - size_t hash_constants_size = hash_constants.size(); - assert(hash_constants_size > 0 && (hash_constants_size & (hash_constants_size - 1)) == 0); - - auto get_hash_value = [&](UInt64 x) { - x ^= x >> 33; - x *= hash_constants[restore_round & (hash_constants_size - 1)]; - x ^= x >> 33; - x *= hash_constants[(restore_round + 1) & (hash_constants_size - 1)]; - x ^= x >> 33; - return x; - }; IColumn::Selector selector(num_rows); @@ -2491,14 +2529,14 @@ IColumn::Selector Join::hashToSelector(const WeakHash32 & hash) const { for (size_t i = 0; i < num_rows; ++i) { - selector[i] = get_hash_value(data[i]) % num_shards; + selector[i] = data[i] % num_shards; } } else { for (size_t i = 0; i < num_rows; ++i) { - selector[i] = get_hash_value(data[i]) & (num_shards - 1); + selector[i] = data[i] & (num_shards - 1); } } @@ -2507,14 +2545,15 @@ IColumn::Selector Join::hashToSelector(const WeakHash32 & hash) const IColumn::Selector Join::selectDispatchBlock(const Strings & key_columns_names, const Block & from_block) { + Columns materialized_columns; + ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(from_block, materialized_columns, key_columns_names); + size_t num_rows = from_block.rows(); + std::vector sort_key_containers; + sort_key_containers.resize(key_columns.size()); - WeakHash32 hash(num_rows); - for (const auto & key_name : key_columns_names) - { - const auto & key_col = from_block.getByName(key_name).column->isColumnConst() ? from_block.getByName(key_name).column->convertToFullColumnIfConst() : from_block.getByName(key_name).column; - key_col->updateWeakHash32(hash); - } + WeakHash32 hash(0); + computeDispatchHash(num_rows, key_columns, collators, sort_key_containers, restore_round, hash); return hashToSelector(hash); } @@ -2545,6 +2584,7 @@ void Join::spillMostMemoryUsedPartitionIfNeed() RUNTIME_CHECK_MSG(restore_round < 4, "max_bytes_before_external_join is too small, join restore round exceeds limit, please make max_bytes_before_external_join larger and try again."); RUNTIME_CHECK_MSG(build_concurrency > 1, "Join with build concurrency = 1 does not support spill."); + is_spilled = true; LOG_DEBUG(log, fmt::format("all bytes used : {}", getTotalByteCount())); for (size_t index = 0; index < partitions.size(); ++index) @@ -2801,17 +2841,17 @@ void Join::releaseBuildPartitionHashTable(size_t partition_index, std::unique_lo map_bytes += clearMapPartition(maps_any_full, type, partition_index); map_bytes += clearMapPartition(maps_all_full, type, partition_index); const auto & join_partition = partitions[partition_index]; - size_t pool_bytes = join_partition->pool->size(); - join_partition->pool.reset(); if (getFullness(kind)) { rows_not_inserted_to_map[partition_index].reset(); rows_not_inserted_to_map[partition_index] = std::make_unique(); } + size_t pool_bytes = join_partition->pool->size(); + join_partition->pool.reset(); if likely (join_partition->memory_usage >= map_bytes + pool_bytes) - partitions[partition_index]->memory_usage -= (map_bytes + pool_bytes); + join_partition->memory_usage -= (map_bytes + pool_bytes); else - partitions[partition_index]->memory_usage = 0; + join_partition->memory_usage = 0; } void Join::releaseProbePartitionBlocks(size_t partition_index, std::unique_lock &) diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 9d6d2ecc7f6..ae2a74c7a10 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -159,6 +159,8 @@ class Join bool hasPartitionSpilled(); + bool isSpilled() const { return is_spilled; } + std::tuple getOneRestoreStream(size_t max_block_size); void dispatchProbeBlock(Block & block, std::list> & partition_blocks_list); @@ -168,7 +170,9 @@ class Join /// Number of keys in all built JOIN maps. size_t getTotalRowCount() const; /// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools. - size_t getTotalByteCount() const; + size_t getTotalByteCount(); + /// The peak build bytes usage, if spill is not enabled, the same as getTotalByteCount + size_t getPeakBuildBytesUsage(); /// size in bytes for partition hash map size_t getPartitionByteCount(size_t partition_index) const; @@ -413,6 +417,8 @@ class Join SpillConfig build_spill_config; SpillConfig probe_spill_config; Int64 join_restore_concurrency; + bool is_spilled = false; + std::atomic peak_build_bytes_usage{0}; BlockInputStreams restore_build_streams; BlockInputStreams restore_probe_streams;