From 70a933839fa49ee644a15616cdc379872a73df4a Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 28 Jun 2022 14:24:46 +0800 Subject: [PATCH 1/4] update --- dbms/src/Flash/Coprocessor/DAGContext.cpp | 2 +- dbms/src/Flash/Coprocessor/DAGContext.h | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 1736e0b6cec..4fb97d1c00a 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -70,7 +70,7 @@ void DAGContext::addSubquery(const String & subquery_id, SubqueryForSet && subqu subqueries.push_back(std::move(subqueries_for_sets)); } -std::unordered_map & DAGContext::getProfileStreamsMap() +std::map & DAGContext::getProfileStreamsMap() { return profile_streams_map; } diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index c20eb3a367e..11e848dfe16 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -192,7 +192,7 @@ class DAGContext } void attachBlockIO(const BlockIO & io_); - std::unordered_map & getProfileStreamsMap(); + std::map & getProfileStreamsMap(); std::unordered_map> & getExecutorIdToJoinIdMap(); @@ -349,7 +349,9 @@ class DAGContext /// Hold io for correcting the destruction order. BlockIO io; /// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams - std::unordered_map profile_streams_map; + /// The order of map.key is necessary for the executor summary of executor list which without executor_id. + /// It uses `${i}_${type}` as executor_id to ensure the order of map.keys is consistent. + std::map profile_streams_map; /// executor_id_to_join_id_map is a map that maps executor id to all the join executor id of itself and all its children. std::unordered_map> executor_id_to_join_id_map; /// join_execute_info_map is a map that maps from join_probe_executor_id to JoinExecuteInfo From 4a1c2bb5535bd3dc1a6b4d9fe00cbe2834e9bea1 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 30 Jun 2022 10:47:29 +0800 Subject: [PATCH 2/4] add more comments --- dbms/src/Flash/Coprocessor/DAGContext.h | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 11e848dfe16..37bc6d0a91f 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -351,6 +351,7 @@ class DAGContext /// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams /// The order of map.key is necessary for the executor summary of executor list which without executor_id. /// It uses `${i}_${type}` as executor_id to ensure the order of map.keys is consistent. + /// Since the number of executors is <= 6 for the executor list, the order of the executor summary can be guaranteed. std::map profile_streams_map; /// executor_id_to_join_id_map is a map that maps executor id to all the join executor id of itself and all its children. std::unordered_map> executor_id_to_join_id_map; From fa26b1137559172ea5514be738ec72913c1d4db2 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 1 Jul 2022 19:05:15 +0800 Subject: [PATCH 3/4] refactor --- dbms/src/Flash/Coprocessor/DAGContext.cpp | 2 +- dbms/src/Flash/Coprocessor/DAGContext.h | 13 ++++---- dbms/src/Flash/Coprocessor/DAGQuerySource.cpp | 23 +++++++++++++ .../Flash/Coprocessor/DAGResponseWriter.cpp | 33 ++++++++++++++----- 4 files changed, 56 insertions(+), 15 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 4fb97d1c00a..1736e0b6cec 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -70,7 +70,7 @@ void DAGContext::addSubquery(const String & subquery_id, SubqueryForSet && subqu subqueries.push_back(std::move(subqueries_for_sets)); } -std::map & DAGContext::getProfileStreamsMap() +std::unordered_map & DAGContext::getProfileStreamsMap() { return profile_streams_map; } diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 37bc6d0a91f..bc00eee12e1 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -192,7 +192,7 @@ class DAGContext } void attachBlockIO(const BlockIO & io_); - std::map & getProfileStreamsMap(); + std::unordered_map & getProfileStreamsMap(); std::unordered_map> & getExecutorIdToJoinIdMap(); @@ -341,6 +341,10 @@ class DAGContext std::vector output_field_types; std::vector output_offsets; + /// Hold the order of list based executors. + /// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors. + std::vector list_based_executors_order; + private: void initExecutorIdToJoinIdMap(); void initOutputInfo(); @@ -348,11 +352,8 @@ class DAGContext private: /// Hold io for correcting the destruction order. BlockIO io; - /// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams - /// The order of map.key is necessary for the executor summary of executor list which without executor_id. - /// It uses `${i}_${type}` as executor_id to ensure the order of map.keys is consistent. - /// Since the number of executors is <= 6 for the executor list, the order of the executor summary can be guaranteed. - std::map profile_streams_map; + /// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams. + std::unordered_map profile_streams_map; /// executor_id_to_join_id_map is a map that maps executor id to all the join executor id of itself and all its children. std::unordered_map> executor_id_to_join_id_map; /// join_execute_info_map is a map that maps from join_probe_executor_id to JoinExecuteInfo diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index 882699e1599..d68a7b17aaa 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -20,6 +20,26 @@ namespace DB { +namespace +{ +void fillOrderForListBasedExecutors(DAGContext & dag_context, const DAGQueryBlock & query_block) +{ + assert(query_block.source); + auto & list_based_executors_order = dag_context.list_based_executors_order; + list_based_executors_order.push_back(query_block.source_name); + if (query_block.selection) + list_based_executors_order.push_back(query_block.selection_name); + if (query_block.aggregation) + list_based_executors_order.push_back(query_block.aggregation_name); + if (query_block.having) + list_based_executors_order.push_back(query_block.having_name); + if (query_block.limit_or_topn) + list_based_executors_order.push_back(query_block.limit_or_topn_name); + if (query_block.exchange_sender) + dag_context.list_based_executors_order.push_back(query_block.exchange_sender_name); +} +} // namespace + DAGQuerySource::DAGQuerySource(Context & context_) : context(context_) { @@ -32,6 +52,9 @@ DAGQuerySource::DAGQuerySource(Context & context_) else { root_query_block = std::make_shared(1, dag_request.executors()); + auto & dag_context = getDAGContext(); + if (!dag_context.return_executor_id) + fillOrderForListBasedExecutors(dag_context, *root_query_block); } } diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp index 53bebc91da8..d3c4f56e10c 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp @@ -89,12 +89,10 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo } } - /// add execution_summary for local executor - for (auto & p : dag_context.getProfileStreamsMap()) - { + auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams) { ExecutionSummary current; /// part 1: local execution info - for (auto & stream_ptr : p.second) + for (auto & stream_ptr : streams) { if (auto * p_stream = dynamic_cast(stream_ptr.get())) { @@ -105,16 +103,16 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo current.concurrency++; } /// part 2: remote execution info - if (merged_remote_execution_summaries.find(p.first) != merged_remote_execution_summaries.end()) + if (merged_remote_execution_summaries.find(executor_id) != merged_remote_execution_summaries.end()) { - for (auto & remote : merged_remote_execution_summaries[p.first]) + for (auto & remote : merged_remote_execution_summaries[executor_id]) current.merge(remote, false); } /// part 3: for join need to add the build time /// In TiFlash, a hash join's build side is finished before probe side starts, /// so the join probe side's running time does not include hash table's build time, /// when construct ExecSummaries, we need add the build cost to probe executor - auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(p.first); + auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id); if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end()) { for (const auto & join_executor_id : all_join_id_it->second) @@ -138,8 +136,27 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo } current.time_processed_ns += dag_context.compile_time_ns; - fillTiExecutionSummary(response.add_execution_summaries(), current, p.first, delta_mode); + fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode); + }; + + /// add execution_summary for local executor + if (dag_context.return_executor_id) + { + for (auto & p : dag_context.getProfileStreamsMap()) + fill_execution_summary(p.first, p.second); + } + else + { + const auto & profile_streams_map = dag_context.getProfileStreamsMap(); + assert(profile_streams_map.size() == dag_context.list_based_executors_order.size()); + for (const auto & executor_id : dag_context.list_based_executors_order) + { + auto it = profile_streams_map.find(executor_id); + assert(it != profile_streams_map.end()); + fill_execution_summary(executor_id, it->second); + } } + for (auto & p : merged_remote_execution_summaries) { if (local_executors.find(p.first) == local_executors.end()) From 468db25446575cd33957496d5d21e9165d307cbb Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 1 Jul 2022 19:32:59 +0800 Subject: [PATCH 4/4] fix clang err --- dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp index d3c4f56e10c..33f6d99f9d8 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp @@ -92,7 +92,7 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams) { ExecutionSummary current; /// part 1: local execution info - for (auto & stream_ptr : streams) + for (const auto & stream_ptr : streams) { if (auto * p_stream = dynamic_cast(stream_ptr.get())) {