From 9e7299e9c2628fc2259f4abc12393b4b319272ea Mon Sep 17 00:00:00 2001 From: SeaRise Date: Sat, 9 Jul 2022 00:54:49 +0800 Subject: [PATCH 1/7] u --- dbms/src/Debug/astToExecutor.cpp | 9 +++++++++ dbms/src/Flash/Coprocessor/DAGContext.cpp | 5 +++++ dbms/src/Flash/Coprocessor/DAGContext.h | 2 ++ 3 files changed, 16 insertions(+) diff --git a/dbms/src/Debug/astToExecutor.cpp b/dbms/src/Debug/astToExecutor.cpp index 481eac65fe2..6cbba6efe99 100644 --- a/dbms/src/Debug/astToExecutor.cpp +++ b/dbms/src/Debug/astToExecutor.cpp @@ -170,6 +170,7 @@ std::unordered_map func_name_to_sig({ {"cast_decimal_datetime", tipb::ScalarFuncSig::CastDecimalAsTime}, {"cast_time_datetime", tipb::ScalarFuncSig::CastTimeAsTime}, {"cast_string_datetime", tipb::ScalarFuncSig::CastStringAsTime}, + {"concat", tipb::ScalarFuncSig::Concat}, {"round_int", tipb::ScalarFuncSig::RoundInt}, {"round_uint", tipb::ScalarFuncSig::RoundInt}, {"round_dec", tipb::ScalarFuncSig::RoundDec}, @@ -461,6 +462,14 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr ft->set_collate(collator_id); break; } + case tipb::ScalarFuncSig::Concat: + { + expr->set_sig(it_sig->second); + auto * ft = expr->mutable_field_type(); + ft->set_tp(TiDB::TypeString); + ft->set_collate(collator_id); + break; + } case tipb::ScalarFuncSig::RoundInt: case tipb::ScalarFuncSig::RoundWithFracInt: { diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index ec0544c6ee4..7877ea932eb 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -75,6 +75,11 @@ std::unordered_map & DAGContext::getProfileStreamsMap return profile_streams_map; } +void DAGContext::updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit) +{ + final_concurrency = std::min(std::max(final_concurrency, cur_streams_size), streams_upper_limit); +} + void DAGContext::initExecutorIdToJoinIdMap() { // only mpp task has join executor diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index a50a4d4007b..5878eaf3f2a 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -303,6 +303,8 @@ class DAGContext return sql_mode & f; } + void updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit); + bool isTest() const { return is_test; } void setColumnsForTest(std::unordered_map & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; } ColumnsWithTypeAndName columnsForTest(String executor_id); From 38349d6f4efa9acb6db90bdb71241033f9736806 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Sat, 9 Jul 2022 01:00:30 +0800 Subject: [PATCH 2/7] u2 --- dbms/src/Interpreters/Settings.h | 10 +++++----- dbms/src/TestUtils/mockExecutor.h | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 9361e0525d2..add761c581d 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -209,7 +209,7 @@ struct Settings * Basically, limits are checked for each block (not every row). That is, the limits can be slightly violated. \ * Almost all limits apply only to SELECTs. \ * Almost all limits apply to each stream individually. \ - */ \ + */ \ \ M(SettingUInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it " \ "is only checked on a remote server.") \ @@ -272,7 +272,7 @@ struct Settings M(SettingUInt64, dt_segment_delta_small_column_file_size, 8388608, "Determine whether a column file in delta is small or not. 8MB by default.") \ M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.") \ M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.") \ - M(SettingUInt64, dt_bg_gc_check_interval, 60, "Background gc thread check interval, the unit is second.") \ + M(SettingUInt64, dt_bg_gc_check_interval, 60, "Background gc thread check interval, the unit is second.") \ M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 100, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.") \ M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all " \ "segments") \ @@ -355,15 +355,15 @@ struct Settings M(SettingUInt64, elastic_threadpool_init_cap, 400, "The size of elastic thread pool.") \ M(SettingUInt64, elastic_threadpool_shrink_period_ms, 300000, "The shrink period(ms) of elastic thread pool.") \ M(SettingBool, enable_local_tunnel, true, "Enable local data transfer between local MPP tasks.") \ - M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ - M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.")\ + M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ + M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.") \ M(SettingBool, enable_async_server, true, "Enable async rpc server.") \ M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \ M(SettingUInt64, async_cqs, 1, "grpc async cqs") \ M(SettingUInt64, preallocated_request_count_per_poller, 20, "grpc preallocated_request_count_per_poller") \ \ M(SettingUInt64, manual_compact_pool_size, 1, "The number of worker threads to handle manual compact requests.") \ - M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \ + M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \ M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") // clang-format on diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 5f752e58da6..1f7bd2322fa 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -166,6 +166,7 @@ MockWindowFrame buildDefaultRowsFrame(); #define col(name) buildColumn((name)) #define lit(field) buildLiteral((field)) +#define concat(expr1, expr2) makeASTFunction("concat", (expr1), (expr2)) #define eq(expr1, expr2) makeASTFunction("equals", (expr1), (expr2)) #define Not_eq(expr1, expr2) makeASTFunction("notEquals", (expr1), (expr2)) #define lt(expr1, expr2) makeASTFunction("less", (expr1), (expr2)) From edac6bb33dd9dcb79467a64b5d6a3429b63d03e2 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Sat, 9 Jul 2022 01:03:58 +0800 Subject: [PATCH 3/7] u3 --- dbms/src/DataStreams/TiRemoteBlockInputStream.h | 13 +++---------- dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp | 13 ++++++++++++- dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h | 4 +++- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index f249bf1a0dc..76fda0b57d0 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -176,21 +177,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream , log(Logger::get(name, req_id, executor_id)) , total_rows(0) { - // generate sample block - ColumnsWithTypeAndName columns; - for (auto & dag_col : remote_reader->getOutputSchema()) - { - auto tp = getDataTypeByColumnInfoForComputingLayer(dag_col.second); - ColumnWithTypeAndName col(tp, dag_col.first); - columns.emplace_back(col); - } - for (size_t i = 0; i < source_num; i++) + for (size_t i = 0; i < source_num; ++i) { execution_summaries_inited[i].store(false); } execution_summaries.resize(source_num); connection_profile_infos.resize(source_num); - sample_block = Block(columns); + sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema()))); } Block getHeader() const override { return sample_block; } diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp index be3475f714f..efb8a08f1d8 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp @@ -54,4 +54,15 @@ ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_ } return column_with_type_and_names; } -} // namespace DB \ No newline at end of file + +NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema) +{ + NamesAndTypes names_and_types; + for (const auto & col : dag_schema) + { + auto tp = getDataTypeByColumnInfoForComputingLayer(col.second); + names_and_types.emplace_back(col.first, tp); + } + return names_and_types; +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h index 617f69de925..96f202d800e 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -23,4 +24,5 @@ namespace DB { NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan); ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types); -} // namespace DB \ No newline at end of file +NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema); +} // namespace DB From f364ad390805cbccca4eb7c55bcc812a31a4d894 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Sat, 9 Jul 2022 01:25:57 +0800 Subject: [PATCH 4/7] u4 --- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 71 +++---------------- .../Coprocessor/DAGQueryBlockInterpreter.h | 2 - .../Flash/Coprocessor/InterpreterUtils.cpp | 54 ++++++++++++++ dbms/src/Flash/Coprocessor/InterpreterUtils.h | 15 ++++ 4 files changed, 79 insertions(+), 63 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index bf695da34c1..aae0270d964 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -268,7 +267,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & size_t join_build_concurrency = settings.join_concurrent_build ? std::min(max_streams, build_pipeline.streams.size()) : 1; /// build side streams - executeExpression(build_pipeline, build_side_prepare_actions, "append join key and join filters for build side"); + executeExpression(build_pipeline, build_side_prepare_actions, log, "append join key and join filters for build side"); // add a HashJoinBuildBlockInputStream to build a shared hash table auto get_concurrency_build_index = JoinInterpreterHelper::concurrencyBuildIndexGenerator(join_build_concurrency); build_pipeline.transform([&](auto & stream) { @@ -284,7 +283,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & join_ptr->init(right_query.source->getHeader(), join_build_concurrency); /// probe side streams - executeExpression(probe_pipeline, probe_side_prepare_actions, "append join key and join filters for probe side"); + executeExpression(probe_pipeline, probe_side_prepare_actions, log, "append join key and join filters for probe side"); NamesAndTypes source_columns; for (const auto & p : probe_pipeline.firstStream()->getHeader()) source_columns.emplace_back(p.name, p.type); @@ -349,7 +348,7 @@ void DAGQueryBlockInterpreter::executeWindow( DAGPipeline & pipeline, WindowDescription & window_description) { - executeExpression(pipeline, window_description.before_window, "before window"); + executeExpression(pipeline, window_description.before_window, log, "before window"); /// If there are several streams, we merge them into one executeUnion(pipeline, max_streams, log, false, "merge into one for window input"); @@ -365,10 +364,7 @@ void DAGQueryBlockInterpreter::executeAggregation( AggregateDescriptions & aggregate_descriptions, bool is_final_agg) { - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, expression_actions_ptr, log->identifier()); - stream->setExtraInfo("before aggregation"); - }); + executeExpression(pipeline, expression_actions_ptr, log, "before aggregation"); Block before_agg_header = pipeline.firstStream()->getHeader(); @@ -426,56 +422,15 @@ void DAGQueryBlockInterpreter::executeAggregation( } } -void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, const String & extra_info) -{ - if (!expressionActionsPtr->getActions().empty()) - { - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, expressionActionsPtr, log->identifier()); - stream->setExtraInfo(extra_info); - }); - } -} - void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc) { - orderStreams(pipeline, sort_desc, 0); + orderStreams(pipeline, max_streams, sort_desc, 0, context, log); } void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns) { Int64 limit = query_block.limit_or_topn->topn().limit(); - orderStreams(pipeline, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit); -} - -void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit) -{ - const Settings & settings = context.getSettingsRef(); - - pipeline.transform([&](auto & stream) { - auto sorting_stream = std::make_shared(stream, order_descr, log->identifier(), limit); - - /// Limits on sorting - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); - sorting_stream->setLimits(limits); - - stream = sorting_stream; - }); - - /// If there are several streams, we merge them into one - executeUnion(pipeline, max_streams, log, false, "for partial order"); - - /// Merge the sorted blocks. - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), - order_descr, - settings.max_block_size, - limit, - settings.max_bytes_before_external_sort, - context.getTemporaryPath(), - log->identifier()); + orderStreams(pipeline, max_streams, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, context, log); } void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key) @@ -553,10 +508,7 @@ void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const ti output_columns.emplace_back(alias, col.type); project_cols.emplace_back(col.name, alias); } - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, chain.getLastActions(), log->identifier()); - stream->setExtraInfo("before projection"); - }); + executeExpression(pipeline, chain.getLastActions(), log, "before projection"); executeProject(pipeline, project_cols, "projection"); analyzer = std::make_unique(std::move(output_columns), context); } @@ -571,7 +523,7 @@ void DAGQueryBlockInterpreter::handleWindow(DAGPipeline & pipeline, const tipb:: DAGExpressionAnalyzer dag_analyzer(input_columns, context); WindowDescription window_description = dag_analyzer.buildWindowDescription(window); executeWindow(pipeline, window_description); - executeExpression(pipeline, window_description.after_window, "cast after window"); + executeExpression(pipeline, window_description.after_window, log, "cast after window"); analyzer = std::make_unique(window_description.after_window_columns, context); } @@ -683,7 +635,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) } if (res.before_order_and_select) { - executeExpression(pipeline, res.before_order_and_select, "before order and select"); + executeExpression(pipeline, res.before_order_and_select, log, "before order and select"); } if (!res.order_columns.empty()) @@ -719,10 +671,7 @@ void DAGQueryBlockInterpreter::executeProject(DAGPipeline & pipeline, NamesWithA if (project_cols.empty()) return; ExpressionActionsPtr project = generateProjectExpressionActions(pipeline.firstStream(), context, project_cols); - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, project, log->identifier()); - stream->setExtraInfo(extra_info); - }); + executeExpression(pipeline, project, log, extra_info); } void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index e68c4f91cee..cabdd4dc9be 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -67,9 +67,7 @@ class DAGQueryBlockInterpreter void handleWindow(DAGPipeline & pipeline, const tipb::Window & window); void handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort); void executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column, const String & extra_info = ""); - void executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, const String & extra_info = ""); void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc); - void orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit); void executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns); void executeLimit(DAGPipeline & pipeline); void executeWindow( diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 6415d36389b..c747823b69d 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include #include #include @@ -88,4 +91,55 @@ ExpressionActionsPtr generateProjectExpressionActions( project->add(ExpressionAction::project(project_cols)); return project; } + +void executeExpression( + DAGPipeline & pipeline, + const ExpressionActionsPtr & expr_actions, + const LoggerPtr & log, + const String & extra_info) +{ + if (expr_actions && !expr_actions->getActions().empty()) + { + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, expr_actions, log->identifier()); + stream->setExtraInfo(extra_info); + }); + } +} + +void orderStreams( + DAGPipeline & pipeline, + size_t max_streams, + SortDescription order_descr, + Int64 limit, + const Context & context, + const LoggerPtr & log) +{ + const Settings & settings = context.getSettingsRef(); + + pipeline.transform([&](auto & stream) { + auto sorting_stream = std::make_shared(stream, order_descr, log->identifier(), limit); + + /// Limits on sorting + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); + sorting_stream->setLimits(limits); + + stream = sorting_stream; + }); + + /// If there are several streams, we merge them into one + executeUnion(pipeline, max_streams, log, false, "for partial order"); + + /// Merge the sorted blocks. + pipeline.firstStream() = std::make_shared( + pipeline.firstStream(), + order_descr, + settings.max_block_size, + limit, + settings.max_bytes_before_external_sort, + context.getTemporaryPath(), + log->identifier()); +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 5c4d4721d5e..36280f3b903 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include @@ -44,4 +45,18 @@ ExpressionActionsPtr generateProjectExpressionActions( const BlockInputStreamPtr & stream, const Context & context, const NamesWithAliases & project_cols); + +void executeExpression( + DAGPipeline & pipeline, + const ExpressionActionsPtr & expr_actions, + const LoggerPtr & log, + const String & extra_info = ""); + +void orderStreams( + DAGPipeline & pipeline, + size_t max_streams, + SortDescription order_descr, + Int64 limit, + const Context & context, + const LoggerPtr & log); } // namespace DB From 6d8f89c385d6bf1cf79c75928143d9b40d0c12ee Mon Sep 17 00:00:00 2001 From: SeaRise Date: Sat, 9 Jul 2022 01:50:03 +0800 Subject: [PATCH 5/7] u5 --- .../Coprocessor/DAGExpressionAnalyzer.cpp | 32 ++++++---- .../Flash/Coprocessor/DAGExpressionAnalyzer.h | 58 +++++++++++-------- 2 files changed, 54 insertions(+), 36 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index aa269469cdb..5fbd86e9762 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -1130,30 +1130,40 @@ NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock( const std::vector & output_offsets, const String & column_prefix, bool keep_session_timezone_info) +{ + auto & step = initAndGetLastStep(chain); + + NamesWithAliases final_project = buildFinalProjection(step.actions, schema, output_offsets, column_prefix, keep_session_timezone_info); + + for (const auto & name : final_project) + { + step.required_output.push_back(name.first); + } + return final_project; +} + +NamesWithAliases DAGExpressionAnalyzer::buildFinalProjection( + const ExpressionActionsPtr & actions, + const std::vector & schema, + const std::vector & output_offsets, + const String & column_prefix, + bool keep_session_timezone_info) { if (unlikely(output_offsets.empty())) - throw Exception("Root Query block without output_offsets", ErrorCodes::LOGICAL_ERROR); + throw Exception("DAGRequest without output_offsets", ErrorCodes::LOGICAL_ERROR); bool need_append_timezone_cast = !keep_session_timezone_info && !context.getTimezoneInfo().is_utc_timezone; auto [need_append_type_cast, need_append_type_cast_vec] = isCastRequiredForRootFinalProjection(schema, output_offsets); assert(need_append_type_cast_vec.size() == output_offsets.size()); - auto & step = initAndGetLastStep(chain); - if (need_append_timezone_cast || need_append_type_cast) { // after appendCastForRootFinalProjection, source_columns has been modified. - appendCastForRootFinalProjection(step.actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec); + appendCastForRootFinalProjection(actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec); } // generate project aliases from source_columns. - NamesWithAliases final_project = genRootFinalProjectAliases(column_prefix, output_offsets); - - for (const auto & name : final_project) - { - step.required_output.push_back(name.first); - } - return final_project; + return genRootFinalProjectAliases(column_prefix, output_offsets); } String DAGExpressionAnalyzer::alignReturnType( diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index 046088ab2b2..63d35abe26d 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -102,6 +102,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable ExpressionActionsChain & chain, const String & column_prefix) const; + NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const; + // Generate a project action for root DAGQueryBlock, // to keep the schema of Block and tidb-schema the same. NamesWithAliases appendFinalProjectForRootQueryBlock( @@ -111,6 +113,13 @@ class DAGExpressionAnalyzer : private boost::noncopyable const String & column_prefix, bool keep_session_timezone_info); + NamesWithAliases buildFinalProjection( + const ExpressionActionsPtr & actions, + const std::vector & schema, + const std::vector & output_offsets, + const String & column_prefix, + bool keep_session_timezone_info); + String getActions( const tipb::Expr & expr, const ExpressionActionsPtr & actions, @@ -153,17 +162,38 @@ class DAGExpressionAnalyzer : private boost::noncopyable const tipb::Window & window, size_t window_columns_start_index); -#ifndef DBMS_PUBLIC_GTEST -private: -#endif NamesAndTypes buildOrderColumns( const ExpressionActionsPtr & actions, const ::google::protobuf::RepeatedPtrField & order_by); + String buildFilterColumn( + const ExpressionActionsPtr & actions, + const std::vector & conditions); + + void buildAggFuncs( + const tipb::Aggregation & aggregation, + const ExpressionActionsPtr & actions, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns); + + void buildAggGroupBy( + const google::protobuf::RepeatedPtrField & group_by, + const ExpressionActionsPtr & actions, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns, + Names & aggregation_keys, + std::unordered_set & agg_key_set, + bool group_by_collation_sensitive, + TiDB::TiDBCollators & collators); + void appendCastAfterAgg( const ExpressionActionsPtr & actions, const tipb::Aggregation & agg); +#ifndef DBMS_PUBLIC_GTEST +private: +#endif + String buildTupleFunctionForGroupConcat( const tipb::Expr & expr, SortDescription & sort_desc, @@ -187,22 +217,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable NamesAndTypes & aggregated_columns, bool empty_input_as_null); - void buildAggFuncs( - const tipb::Aggregation & aggregation, - const ExpressionActionsPtr & actions, - AggregateDescriptions & aggregate_descriptions, - NamesAndTypes & aggregated_columns); - - void buildAggGroupBy( - const google::protobuf::RepeatedPtrField & group_by, - const ExpressionActionsPtr & actions, - AggregateDescriptions & aggregate_descriptions, - NamesAndTypes & aggregated_columns, - Names & aggregation_keys, - std::unordered_set & agg_key_set, - bool group_by_collation_sensitive, - TiDB::TiDBCollators & collators); - void fillArgumentDetail( const ExpressionActionsPtr & actions, const tipb::Expr & arg, @@ -275,12 +289,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable const ExpressionActionsPtr & actions, const String & column_name); - String buildFilterColumn( - const ExpressionActionsPtr & actions, - const std::vector & conditions); - - NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const; - NamesWithAliases genRootFinalProjectAliases( const String & column_prefix, const std::vector & output_offsets) const; From fbb3d221f9044a61f868a4dbfbc3fae2fdd1d3a7 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Sat, 9 Jul 2022 02:09:12 +0800 Subject: [PATCH 6/7] fix tests --- dbms/src/Flash/tests/gtest_interpreter.cpp | 214 +++++++++------------ 1 file changed, 93 insertions(+), 121 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index c583fbf35c6..7aba12d7ec8 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -103,15 +103,12 @@ try Union: Expression x 10: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockTableScan)"; + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -125,16 +122,14 @@ Union: Union: Expression x 10: Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -150,22 +145,18 @@ Union: Union: Expression x 10: Expression: - Expression: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -188,27 +179,22 @@ Union: Limit x 10, limit = 10 Expression: Expression: - Expression: - Expression: - Expression: - Filter: - Expression: - Expression: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + Expression: + Filter: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -222,15 +208,12 @@ Union: Union: Expression x 10: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockExchangeReceiver)"; + MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -246,15 +229,12 @@ Union: MockExchangeSender x 10 Expression: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockExchangeReceiver)"; + MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } } @@ -294,17 +274,16 @@ Union: Union: Expression x 10: Expression: - Expression: - Expression: - SharedQuery: - Expression: - Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} - Expression: - MergeSorting, limit = 0 - Union: - PartialSorting x 10: limit = 0 - Expression: - MockTableScan)"; + Expression: + SharedQuery: + Expression: + Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} + Expression: + MergeSorting, limit = 0 + Union: + PartialSorting x 10: limit = 0 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -319,22 +298,20 @@ Union: Union: Expression x 10: Expression: - Expression: - Expression: - SharedQuery: - Expression: - Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} - Union: - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 0 - Union: - PartialSorting x 10: limit = 0 - Expression: - MockTableScan)"; + Expression: + SharedQuery: + Expression: + Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} + Union: + Expression x 10: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 0 + Union: + PartialSorting x 10: limit = 0 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } } @@ -499,11 +476,10 @@ CreatingSets Expression x 10: SharedQuery: ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - HashJoinProbe: - Expression: - MockTableScan)"; + Expression x 10: + HashJoinProbe: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -529,15 +505,13 @@ CreatingSets Expression x 10: SharedQuery: ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - HashJoinProbe: - Expression: - Expression: - MockTableScan - Expression x 10: - Expression: - NonJoined: )"; + Expression x 10: + HashJoinProbe: + Expression: + Expression: + MockTableScan + Expression x 10: + NonJoined: )"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -571,15 +545,13 @@ CreatingSets Expression: SharedQuery: ParallelAggregating, max_threads: 20, final: true - Expression x 20: - Expression: - HashJoinProbe: - Expression: - Expression: - MockExchangeReceiver - Expression x 20: - Expression: - NonJoined: )"; + Expression x 20: + HashJoinProbe: + Expression: + Expression: + MockExchangeReceiver + Expression x 20: + NonJoined: )"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 20); } } From 1b18ac45e23703a7ab9db1e881005ac7db348ca7 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 11 Jul 2022 13:36:31 +0800 Subject: [PATCH 7/7] fix f --- dbms/src/Flash/Coprocessor/DAGContext.cpp | 2 + dbms/src/Flash/Coprocessor/DAGContext.h | 2 + .../Coprocessor/DAGQueryBlockInterpreter.cpp | 8 +--- .../Flash/Coprocessor/InterpreterUtils.cpp | 45 ++++++++++++++----- dbms/src/Flash/Coprocessor/InterpreterUtils.h | 1 + 5 files changed, 41 insertions(+), 17 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 7877ea932eb..1cf7a0d6c87 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -30,6 +30,8 @@ extern const int DIVIDED_BY_ZERO; extern const int INVALID_TIME; } // namespace ErrorCodes +const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle"; + bool strictSqlMode(UInt64 sql_mode) { return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 9e91161b669..7bfc67afcad 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -121,6 +121,8 @@ inline bool enableFineGrainedShuffle(uint64_t stream_count) return stream_count > 0; } +extern const String enableFineGrainedShuffleExtraInfo; + /// A context used to track the information that needs to be passed around during DAG planning. class DAGContext { diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index aae4d1bb85c..764bf07f533 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -56,10 +56,6 @@ namespace FailPoints { extern const char minimum_block_size_for_cross_join[]; } // namespace FailPoints -namespace -{ -const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle"; -} DAGQueryBlockInterpreter::DAGQueryBlockInterpreter( Context & context_, @@ -438,7 +434,7 @@ void DAGQueryBlockInterpreter::executeAggregation( } } -void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc) +void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle) { orderStreams(pipeline, max_streams, sort_desc, 0, enable_fine_grained_shuffle, context, log); } @@ -446,7 +442,7 @@ void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDe void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns) { Int64 limit = query_block.limit_or_topn->topn().limit(); - orderStreams(pipeline, max_streams, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, enable_fine_grained_shuffle, context, log); + orderStreams(pipeline, max_streams, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, false, context, log); } void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key) diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index c747823b69d..002a06d07b9 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -112,10 +113,14 @@ void orderStreams( size_t max_streams, SortDescription order_descr, Int64 limit, + bool enable_fine_grained_shuffle, const Context & context, const LoggerPtr & log) { const Settings & settings = context.getSettingsRef(); + String extra_info; + if (enable_fine_grained_shuffle) + extra_info = enableFineGrainedShuffleExtraInfo; pipeline.transform([&](auto & stream) { auto sorting_stream = std::make_shared(stream, order_descr, log->identifier(), limit); @@ -127,19 +132,37 @@ void orderStreams( sorting_stream->setLimits(limits); stream = sorting_stream; + stream->setExtraInfo(extra_info); }); - /// If there are several streams, we merge them into one - executeUnion(pipeline, max_streams, log, false, "for partial order"); + if (enable_fine_grained_shuffle) + { + pipeline.transform([&](auto & stream) { + stream = std::make_shared( + stream, + order_descr, + settings.max_block_size, + limit, + settings.max_bytes_before_external_sort, + context.getTemporaryPath(), + log->identifier()); + stream->setExtraInfo(enableFineGrainedShuffleExtraInfo); + }); + } + else + { + /// If there are several streams, we merge them into one + executeUnion(pipeline, max_streams, log, false, "for partial order"); - /// Merge the sorted blocks. - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), - order_descr, - settings.max_block_size, - limit, - settings.max_bytes_before_external_sort, - context.getTemporaryPath(), - log->identifier()); + /// Merge the sorted blocks. + pipeline.firstStream() = std::make_shared( + pipeline.firstStream(), + order_descr, + settings.max_block_size, + limit, + settings.max_bytes_before_external_sort, + context.getTemporaryPath(), + log->identifier()); + } } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 36280f3b903..bd64346718c 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -57,6 +57,7 @@ void orderStreams( size_t max_streams, SortDescription order_descr, Int64 limit, + bool enable_fine_grained_shuffle, const Context & context, const LoggerPtr & log); } // namespace DB