diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index 1384457729e..c3654c435ab 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -54,8 +54,18 @@ void fillOrderForListBasedExecutors(DAGContext & dag_context, const PhysicalPlan auto & list_based_executors_order = dag_context.list_based_executors_order; PhysicalPlanVisitor::visitPostOrder(root_node, [&](const PhysicalPlanNodePtr & plan) { assert(plan); - if (plan->isRecordProfileStreams()) - list_based_executors_order.push_back(plan->execId()); + if (plan->isTiDBOperator()) + { + if (plan->tp() == PlanType::TableScan) + { + auto physical_table_scan = std::static_pointer_cast(plan); + if (physical_table_scan->hasPushDownFilter()) + list_based_executors_order.push_back(physical_table_scan->getPushDownFilterId()); + list_based_executors_order.push_back(physical_table_scan->execId()); + } + else + list_based_executors_order.push_back(plan->execId()); + } }); } } // namespace diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp index 82482548737..b16a2e544b7 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp @@ -45,10 +45,10 @@ String PhysicalPlanNode::toString() return buffer.toString(); }; return fmt::format( - "<{}, {}> | is_record_profile_streams: {}, schema: {}", + "<{}, {}> | is_tidb_operator: {}, schema: {}", type.toString(), executor_id, - is_record_profile_streams, + is_tidb_operator, schema_to_string()); } @@ -59,14 +59,20 @@ void PhysicalPlanNode::finalize() void PhysicalPlanNode::recordProfileStreams(DAGPipeline & pipeline, const Context & context) { - auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id]; - pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); + auto & profile_streams_map = context.getDAGContext()->getProfileStreamsMap(); + /// The profile stream of some operators has been recorded. + /// For example, `DAGStorageInterpreter` records the profile streams of PhysicalTableScan. + if (profile_streams_map.find(executor_id) == profile_streams_map.end()) + { + auto & profile_streams = profile_streams_map[executor_id]; + pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); + } } void PhysicalPlanNode::transform(DAGPipeline & pipeline, Context & context, size_t max_streams) { transformImpl(pipeline, context, max_streams); - if (is_record_profile_streams) + if (is_tidb_operator) recordProfileStreams(pipeline, context); if (is_restore_concurrency) { diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.h b/dbms/src/Flash/Planner/PhysicalPlanNode.h index e4437fe27ef..397f7e037f9 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanNode.h +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.h @@ -62,9 +62,9 @@ class PhysicalPlanNode /// Obtain a sample block that contains the names and types of result columns. virtual const Block & getSampleBlock() const = 0; - bool isRecordProfileStreams() const { return is_record_profile_streams; } + bool isTiDBOperator() const { return is_tidb_operator; } - void disableRecordProfileStreams() { is_record_profile_streams = false; } + void notTiDBOperator() { is_tidb_operator = false; } void disableRestoreConcurrency() { is_restore_concurrency = false; } @@ -79,7 +79,7 @@ class PhysicalPlanNode PlanType type; NamesAndTypes schema; - bool is_record_profile_streams = true; + bool is_tidb_operator = true; bool is_restore_concurrency = true; LoggerPtr log; diff --git a/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp b/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp index 34b9a7688e4..f2e13417ce4 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp @@ -92,8 +92,8 @@ PhysicalPlanNodePtr PhysicalProjection::buildNonRootFinal( child, "final projection", project_actions); - // For final projection, no need to record profile streams. - physical_projection->disableRecordProfileStreams(); + // Final Projection is not a tidb operator, so no need to record profile streams. + physical_projection->notTiDBOperator(); return physical_projection; } @@ -137,8 +137,8 @@ PhysicalPlanNodePtr PhysicalProjection::buildRootFinal( child, "final projection", project_actions); - // For final projection, no need to record profile streams. - physical_projection->disableRecordProfileStreams(); + // Final Projection is not a tidb operator, so no need to record profile streams. + physical_projection->notTiDBOperator(); return physical_projection; } diff --git a/dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp b/dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp index cb702f60bd1..a8f9bc67081 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp @@ -116,4 +116,10 @@ bool PhysicalTableScan::hasPushDownFilter() const { return push_down_filter.hasValue(); } + +const String & PhysicalTableScan::getPushDownFilterId() const +{ + assert(hasPushDownFilter()); + return push_down_filter.executor_id; +} } // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalTableScan.h b/dbms/src/Flash/Planner/plans/PhysicalTableScan.h index f064e69e811..947750950e8 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalTableScan.h +++ b/dbms/src/Flash/Planner/plans/PhysicalTableScan.h @@ -40,11 +40,13 @@ class PhysicalTableScan : public PhysicalLeaf bool pushDownFilter(const String & filter_executor_id, const tipb::Selection & selection); + bool hasPushDownFilter() const; + + const String & getPushDownFilterId() const; + private: void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; - bool hasPushDownFilter() const; - private: PushDownFilter push_down_filter; diff --git a/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp b/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp index 9dc3452de32..20ec1150276 100644 --- a/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp +++ b/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp @@ -133,9 +133,9 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: Filter @@ -155,9 +155,9 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: Limit, limit = 1 @@ -177,9 +177,9 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: MergeSorting, limit = 1 @@ -201,9 +201,9 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: Expression: @@ -225,9 +225,9 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: - | is_record_profile_streams: true, schema: - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: + | is_tidb_operator: true, schema: + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: Expression: @@ -246,9 +246,9 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: true, schema: , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( MockExchangeSender Expression: @@ -267,8 +267,8 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: MockExchangeReceiver)", @@ -295,10 +295,10 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , , - | is_record_profile_streams: true, schema: , , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , , + | is_tidb_operator: true, schema: , , + | is_tidb_operator: true, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: Expression: @@ -314,10 +314,10 @@ Expression: execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , , - | is_record_profile_streams: true, schema: , , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , , + | is_tidb_operator: true, schema: , , + | is_tidb_operator: true, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: Expression: @@ -340,8 +340,8 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( Expression: MockTableScan)", @@ -366,12 +366,12 @@ try execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , , , - | is_record_profile_streams: true, schema: , , , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , , , + | is_tidb_operator: true, schema: , , , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( CreatingSets HashJoinBuildBlockInputStream: , join_kind = Inner @@ -393,12 +393,12 @@ CreatingSets execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , , , - | is_record_profile_streams: true, schema: , , , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , , , + | is_tidb_operator: true, schema: , , , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( CreatingSets HashJoinBuildBlockInputStream: , join_kind = Left @@ -420,12 +420,12 @@ CreatingSets execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , , , - | is_record_profile_streams: true, schema: , , , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , , , + | is_tidb_operator: true, schema: , , , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( CreatingSets HashJoinBuildBlockInputStream: , join_kind = Right @@ -459,20 +459,20 @@ CreatingSets execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , , , , , , , , , - | is_record_profile_streams: true, schema: , , , , , , , , , - | is_record_profile_streams: false, schema: , , , , , - | is_record_profile_streams: true, schema: , , , , , - | is_record_profile_streams: false, schema: , , - | is_record_profile_streams: true, schema: , , - | is_record_profile_streams: false, schema: , , - | is_record_profile_streams: true, schema: , , - | is_record_profile_streams: false, schema: , , , - | is_record_profile_streams: true, schema: , , , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , , , , , , , , , + | is_tidb_operator: true, schema: , , , , , , , , , + | is_tidb_operator: false, schema: , , , , , + | is_tidb_operator: true, schema: , , , , , + | is_tidb_operator: false, schema: , , + | is_tidb_operator: true, schema: , , + | is_tidb_operator: false, schema: , , + | is_tidb_operator: true, schema: , , + | is_tidb_operator: false, schema: , , , + | is_tidb_operator: true, schema: , , , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( CreatingSets HashJoinBuildBlockInputStream x 2: , join_kind = Right @@ -529,20 +529,20 @@ CreatingSets execute( request, /*expected_physical_plan=*/R"( - | is_record_profile_streams: false, schema: , , , , , , , , , - | is_record_profile_streams: true, schema: , , , , , , , , , - | is_record_profile_streams: false, schema: , , , , , - | is_record_profile_streams: true, schema: , , , , , - | is_record_profile_streams: false, schema: , , - | is_record_profile_streams: true, schema: , , - | is_record_profile_streams: false, schema: , , - | is_record_profile_streams: true, schema: , , - | is_record_profile_streams: false, schema: , , , - | is_record_profile_streams: true, schema: , , , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , - | is_record_profile_streams: false, schema: , - | is_record_profile_streams: true, schema: , )", + | is_tidb_operator: false, schema: , , , , , , , , , + | is_tidb_operator: true, schema: , , , , , , , , , + | is_tidb_operator: false, schema: , , , , , + | is_tidb_operator: true, schema: , , , , , + | is_tidb_operator: false, schema: , , + | is_tidb_operator: true, schema: , , + | is_tidb_operator: false, schema: , , + | is_tidb_operator: true, schema: , , + | is_tidb_operator: false, schema: , , , + | is_tidb_operator: true, schema: , , , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , + | is_tidb_operator: false, schema: , + | is_tidb_operator: true, schema: , )", /*expected_streams=*/R"( CreatingSets HashJoinBuildBlockInputStream x 2: , join_kind = Right