Skip to content

Commit

Permalink
fix the wrong order of execution summary for list based executors (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise authored and Lloyd-Pottiger committed Jul 19, 2022
1 parent 5f1d278 commit 36fb601
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,18 @@ class DAGContext
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

/// Hold the order of list based executors.
/// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors.
std::vector<String> list_based_executors_order;

private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();

private:
/// Hold io for correcting the destruction order.
BlockIO io;
/// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams
/// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams.
std::unordered_map<String, BlockInputStreams> profile_streams_map;
/// executor_id_to_join_id_map is a map that maps executor id to all the join executor id of itself and all its children.
std::unordered_map<String, std::vector<String>> executor_id_to_join_id_map;
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@

namespace DB
{
namespace
{
void fillOrderForListBasedExecutors(DAGContext & dag_context, const DAGQueryBlock & query_block)
{
assert(query_block.source);
auto & list_based_executors_order = dag_context.list_based_executors_order;
list_based_executors_order.push_back(query_block.source_name);
if (query_block.selection)
list_based_executors_order.push_back(query_block.selection_name);
if (query_block.aggregation)
list_based_executors_order.push_back(query_block.aggregation_name);
if (query_block.having)
list_based_executors_order.push_back(query_block.having_name);
if (query_block.limit_or_topn)
list_based_executors_order.push_back(query_block.limit_or_topn_name);
if (query_block.exchange_sender)
dag_context.list_based_executors_order.push_back(query_block.exchange_sender_name);
}
} // namespace

DAGQuerySource::DAGQuerySource(Context & context_)
: context(context_)
{
Expand All @@ -32,6 +52,9 @@ DAGQuerySource::DAGQuerySource(Context & context_)
else
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors());
auto & dag_context = getDAGContext();
if (!dag_context.return_executor_id)
fillOrderForListBasedExecutors(dag_context, *root_query_block);
}
}

Expand Down
33 changes: 25 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,10 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo
}
}

/// add execution_summary for local executor
for (auto & p : dag_context.getProfileStreamsMap())
{
auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams) {
ExecutionSummary current;
/// part 1: local execution info
for (auto & stream_ptr : p.second)
for (const auto & stream_ptr : streams)
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream_ptr.get()))
{
Expand All @@ -105,16 +103,16 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo
current.concurrency++;
}
/// part 2: remote execution info
if (merged_remote_execution_summaries.find(p.first) != merged_remote_execution_summaries.end())
if (merged_remote_execution_summaries.find(executor_id) != merged_remote_execution_summaries.end())
{
for (auto & remote : merged_remote_execution_summaries[p.first])
for (auto & remote : merged_remote_execution_summaries[executor_id])
current.merge(remote, false);
}
/// part 3: for join need to add the build time
/// In TiFlash, a hash join's build side is finished before probe side starts,
/// so the join probe side's running time does not include hash table's build time,
/// when construct ExecSummaries, we need add the build cost to probe executor
auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(p.first);
auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id);
if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end())
{
for (const auto & join_executor_id : all_join_id_it->second)
Expand All @@ -138,8 +136,27 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo
}

current.time_processed_ns += dag_context.compile_time_ns;
fillTiExecutionSummary(response.add_execution_summaries(), current, p.first, delta_mode);
fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode);
};

/// add execution_summary for local executor
if (dag_context.return_executor_id)
{
for (auto & p : dag_context.getProfileStreamsMap())
fill_execution_summary(p.first, p.second);
}
else
{
const auto & profile_streams_map = dag_context.getProfileStreamsMap();
assert(profile_streams_map.size() == dag_context.list_based_executors_order.size());
for (const auto & executor_id : dag_context.list_based_executors_order)
{
auto it = profile_streams_map.find(executor_id);
assert(it != profile_streams_map.end());
fill_execution_summary(executor_id, it->second);
}
}

for (auto & p : merged_remote_execution_summaries)
{
if (local_executors.find(p.first) == local_executors.end())
Expand Down

0 comments on commit 36fb601

Please sign in to comment.