Skip to content

Commit

Permalink
fix incorrect execution summary of table scan (#5624)
Browse files Browse the repository at this point in the history
close #5625
  • Loading branch information
SeaRise authored Aug 16, 2022
1 parent 49d8050 commit 8404e65
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 92 deletions.
14 changes: 12 additions & 2 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,18 @@ void fillOrderForListBasedExecutors(DAGContext & dag_context, const PhysicalPlan
auto & list_based_executors_order = dag_context.list_based_executors_order;
PhysicalPlanVisitor::visitPostOrder(root_node, [&](const PhysicalPlanNodePtr & plan) {
assert(plan);
if (plan->isRecordProfileStreams())
list_based_executors_order.push_back(plan->execId());
if (plan->isTiDBOperator())
{
if (plan->tp() == PlanType::TableScan)
{
auto physical_table_scan = std::static_pointer_cast<PhysicalTableScan>(plan);
if (physical_table_scan->hasPushDownFilter())
list_based_executors_order.push_back(physical_table_scan->getPushDownFilterId());
list_based_executors_order.push_back(physical_table_scan->execId());
}
else
list_based_executors_order.push_back(plan->execId());
}
});
}
} // namespace
Expand Down
16 changes: 11 additions & 5 deletions dbms/src/Flash/Planner/PhysicalPlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ String PhysicalPlanNode::toString()
return buffer.toString();
};
return fmt::format(
"<{}, {}> | is_record_profile_streams: {}, schema: {}",
"<{}, {}> | is_tidb_operator: {}, schema: {}",
type.toString(),
executor_id,
is_record_profile_streams,
is_tidb_operator,
schema_to_string());
}

Expand All @@ -59,14 +59,20 @@ void PhysicalPlanNode::finalize()

void PhysicalPlanNode::recordProfileStreams(DAGPipeline & pipeline, const Context & context)
{
auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id];
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
auto & profile_streams_map = context.getDAGContext()->getProfileStreamsMap();
/// The profile stream of some operators has been recorded.
/// For example, `DAGStorageInterpreter` records the profile streams of PhysicalTableScan.
if (profile_streams_map.find(executor_id) == profile_streams_map.end())
{
auto & profile_streams = profile_streams_map[executor_id];
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
}
}

void PhysicalPlanNode::transform(DAGPipeline & pipeline, Context & context, size_t max_streams)
{
transformImpl(pipeline, context, max_streams);
if (is_record_profile_streams)
if (is_tidb_operator)
recordProfileStreams(pipeline, context);
if (is_restore_concurrency)
{
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Planner/PhysicalPlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ class PhysicalPlanNode
/// Obtain a sample block that contains the names and types of result columns.
virtual const Block & getSampleBlock() const = 0;

bool isRecordProfileStreams() const { return is_record_profile_streams; }
bool isTiDBOperator() const { return is_tidb_operator; }

void disableRecordProfileStreams() { is_record_profile_streams = false; }
void notTiDBOperator() { is_tidb_operator = false; }

void disableRestoreConcurrency() { is_restore_concurrency = false; }

Expand All @@ -79,7 +79,7 @@ class PhysicalPlanNode
PlanType type;
NamesAndTypes schema;

bool is_record_profile_streams = true;
bool is_tidb_operator = true;
bool is_restore_concurrency = true;

LoggerPtr log;
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Planner/plans/PhysicalProjection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ PhysicalPlanNodePtr PhysicalProjection::buildNonRootFinal(
child,
"final projection",
project_actions);
// For final projection, no need to record profile streams.
physical_projection->disableRecordProfileStreams();
// Final Projection is not a tidb operator, so no need to record profile streams.
physical_projection->notTiDBOperator();
return physical_projection;
}

Expand Down Expand Up @@ -137,8 +137,8 @@ PhysicalPlanNodePtr PhysicalProjection::buildRootFinal(
child,
"final projection",
project_actions);
// For final projection, no need to record profile streams.
physical_projection->disableRecordProfileStreams();
// Final Projection is not a tidb operator, so no need to record profile streams.
physical_projection->notTiDBOperator();
return physical_projection;
}

Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Planner/plans/PhysicalTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,10 @@ bool PhysicalTableScan::hasPushDownFilter() const
{
return push_down_filter.hasValue();
}

const String & PhysicalTableScan::getPushDownFilterId() const
{
assert(hasPushDownFilter());
return push_down_filter.executor_id;
}
} // namespace DB
6 changes: 4 additions & 2 deletions dbms/src/Flash/Planner/plans/PhysicalTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class PhysicalTableScan : public PhysicalLeaf

bool pushDownFilter(const String & filter_executor_id, const tipb::Selection & selection);

bool hasPushDownFilter() const;

const String & getPushDownFilterId() const;

private:
void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override;

bool hasPushDownFilter() const;

private:
PushDownFilter push_down_filter;

Expand Down
Loading

0 comments on commit 8404e65

Please sign in to comment.