diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 1bd1f57962d..9d7a87eabf1 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -70,7 +70,7 @@ static void enrichBlockWithConstants(Block & block, const Block & header) MergeSortingBlockInputStream::MergeSortingBlockInputStream( const BlockInputStreamPtr & input, - SortDescription & description_, + const SortDescription & description_, size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_external_sort_, diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 20d4d374ad2..dd91576f8e1 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -92,7 +92,7 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlockInputStream( const BlockInputStreamPtr & input, - SortDescription & description_, + const SortDescription & description_, size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_external_sort_, diff --git a/dbms/src/DataStreams/PartialSortingBlockInputStream.h b/dbms/src/DataStreams/PartialSortingBlockInputStream.h index 14badb944a5..ccf3fd91dd3 100644 --- a/dbms/src/DataStreams/PartialSortingBlockInputStream.h +++ b/dbms/src/DataStreams/PartialSortingBlockInputStream.h @@ -30,7 +30,7 @@ class PartialSortingBlockInputStream : public IProfilingBlockInputStream /// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order. PartialSortingBlockInputStream( const BlockInputStreamPtr & input_, - SortDescription & description_, + const SortDescription & description_, const String & req_id, size_t limit_ = 0) : description(description_) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 77cbdbada2f..7adb88b3d6b 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -268,18 +268,23 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & auto & join_execute_info = dagContext().getJoinExecuteInfoMap()[query_block.source_name]; - size_t join_build_concurrency = settings.join_concurrent_build ? std::min(max_streams, build_pipeline.streams.size()) : 1; + size_t join_build_concurrency = std::max(build_pipeline.streams.size(), build_pipeline.streams_with_non_joined_data.size()); /// build side streams executeExpression(build_pipeline, build_side_prepare_actions, log, "append join key and join filters for build side"); // add a HashJoinBuildBlockInputStream to build a shared hash table - auto get_concurrency_build_index = JoinInterpreterHelper::concurrencyBuildIndexGenerator(join_build_concurrency); - build_pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, join_ptr, get_concurrency_build_index(), log->identifier()); - stream->setExtraInfo( - fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id)); - join_execute_info.join_build_streams.push_back(stream); - }); + auto build_streams = [&](BlockInputStreams & streams) { + size_t build_index = 0; + for (auto & stream : streams) + { + stream = std::make_shared(stream, join_ptr, build_index++, log->identifier()); + stream->setExtraInfo( + fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id)); + join_execute_info.join_build_streams.push_back(stream); + } + }; + build_streams(build_pipeline.streams); + build_streams(build_pipeline.streams_with_non_joined_data); // for test, join executor need the return blocks to output. executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/!context.isTest(), "for join"); diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 3da680c925e..d71f8b073d1 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -112,7 +112,7 @@ void executeExpression( void orderStreams( DAGPipeline & pipeline, size_t max_streams, - SortDescription order_descr, + const SortDescription & order_descr, Int64 limit, bool enable_fine_grained_shuffle, const Context & context, diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index e2ebcbc5395..87672e81dfa 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -55,7 +55,7 @@ void executeExpression( void orderStreams( DAGPipeline & pipeline, size_t max_streams, - SortDescription order_descr, + const SortDescription & order_descr, Int64 limit, bool enable_fine_grained_shuffle, const Context & context, diff --git a/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp b/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp index 2582a84ac46..2fda680d9e1 100644 --- a/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp +++ b/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp @@ -345,12 +345,4 @@ std::tuple prepareJoin( dag_analyzer.appendJoinKeyAndJoinFilters(chain, keys, key_types, key_names, left, is_right_out_join, filters, filter_column_name); return {chain.getLastActions(), std::move(key_names), std::move(filter_column_name)}; } - -std::function concurrencyBuildIndexGenerator(size_t join_build_concurrency) -{ - size_t init_value = 0; - return [init_value, join_build_concurrency]() mutable { - return (init_value++) % join_build_concurrency; - }; -} } // namespace DB::JoinInterpreterHelper \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h b/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h index 610a8a54c2d..7a1d1a84efc 100644 --- a/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h +++ b/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h @@ -127,7 +127,5 @@ std::tuple prepareJoin( bool left, bool is_right_out_join, const google::protobuf::RepeatedPtrField & filters); - -std::function concurrencyBuildIndexGenerator(size_t join_build_concurrency); } // namespace JoinInterpreterHelper } // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/plans/PhysicalJoin.cpp index c0cb2904d1c..c46bbbe4941 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalJoin.cpp @@ -214,21 +214,24 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co void PhysicalJoin::buildSideTransform(DAGPipeline & build_pipeline, Context & context, size_t max_streams) { auto & dag_context = *context.getDAGContext(); - const auto & settings = context.getSettingsRef(); - - size_t join_build_concurrency = settings.join_concurrent_build ? std::min(max_streams, build_pipeline.streams.size()) : 1; + size_t join_build_concurrency = std::max(build_pipeline.streams.size(), build_pipeline.streams_with_non_joined_data.size()); /// build side streams executeExpression(build_pipeline, build_side_prepare_actions, log, "append join key and join filters for build side"); // add a HashJoinBuildBlockInputStream to build a shared hash table - auto get_concurrency_build_index = JoinInterpreterHelper::concurrencyBuildIndexGenerator(join_build_concurrency); String join_build_extra_info = fmt::format("join build, build_side_root_executor_id = {}", build()->execId()); auto & join_execute_info = dag_context.getJoinExecuteInfoMap()[execId()]; - build_pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, join_ptr, get_concurrency_build_index(), log->identifier()); - stream->setExtraInfo(join_build_extra_info); - join_execute_info.join_build_streams.push_back(stream); - }); + auto build_streams = [&](BlockInputStreams & streams) { + size_t build_index = 0; + for (auto & stream : streams) + { + stream = std::make_shared(stream, join_ptr, build_index++, log->identifier()); + stream->setExtraInfo(join_build_extra_info); + join_execute_info.join_build_streams.push_back(stream); + } + }; + build_streams(build_pipeline.streams); + build_streams(build_pipeline.streams_with_non_joined_data); // for test, join executor need the return blocks to output. executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/!context.isTest(), "for join"); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 10c0dedb32b..57e3a724c3d 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -339,7 +339,6 @@ struct Settings M(SettingUInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.") \ M(SettingOverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ \ - M(SettingBool, join_concurrent_build, true, "Build hash table concurrently for join.") \ M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.") \ M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.") \ M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.") \