diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp index e3cafd268f0..8f17ec0caef 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp @@ -12,187 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include #include namespace DB { -void ExecutionSummary::merge(const ExecutionSummary & other, bool streaming_call) -{ - if (streaming_call) - { - time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); - num_produced_rows = std::max(num_produced_rows, other.num_produced_rows); - num_iterations = std::max(num_iterations, other.num_iterations); - concurrency = std::max(concurrency, other.concurrency); - } - else - { - time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); - num_produced_rows += other.num_produced_rows; - num_iterations += other.num_iterations; - concurrency += other.concurrency; - } -} - -/// delta_mode means when for a streaming call, return the delta execution summary -/// because TiDB is not aware of the streaming call when it handle the execution summaries -/// so we need to "pretend to be a unary call", can be removed if TiDB support streaming -/// call's execution summaries directly -void DAGResponseWriter::fillTiExecutionSummary( - tipb::ExecutorExecutionSummary * execution_summary, - ExecutionSummary & current, - const String & executor_id, - bool delta_mode) -{ - auto & prev_stats = previous_execution_stats[executor_id]; - - execution_summary->set_time_processed_ns( - delta_mode ? current.time_processed_ns - prev_stats.time_processed_ns : current.time_processed_ns); - execution_summary->set_num_produced_rows( - delta_mode ? current.num_produced_rows - prev_stats.num_produced_rows : current.num_produced_rows); - execution_summary->set_num_iterations(delta_mode ? current.num_iterations - prev_stats.num_iterations : current.num_iterations); - execution_summary->set_concurrency(delta_mode ? current.concurrency - prev_stats.concurrency : current.concurrency); - prev_stats = current; - if (dag_context.return_executor_id) - execution_summary->set_executor_id(executor_id); -} - -template -void mergeRemoteExecuteSummaries( - RemoteBlockInputStream * input_stream, - std::unordered_map> & execution_summaries) -{ - size_t source_num = input_stream->getSourceNum(); - for (size_t s_index = 0; s_index < source_num; s_index++) - { - auto remote_execution_summaries = input_stream->getRemoteExecutionSummaries(s_index); - if (remote_execution_summaries == nullptr) - continue; - bool is_streaming_call = input_stream->isStreamingCall(); - for (auto & p : *remote_execution_summaries) - { - if (execution_summaries[p.first].size() < source_num) - { - execution_summaries[p.first].resize(source_num); - } - execution_summaries[p.first][s_index].merge(p.second, is_streaming_call); - } - } -} - -void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode) -{ - if (!dag_context.collect_execution_summaries) - return; - /// get executionSummary info from remote input streams - std::unordered_map> merged_remote_execution_summaries; - for (const auto & map_entry : dag_context.getInBoundIOInputStreamsMap()) - { - for (const auto & stream_ptr : map_entry.second) - { - if (auto * exchange_receiver_stream_ptr = dynamic_cast(stream_ptr.get())) - { - mergeRemoteExecuteSummaries(exchange_receiver_stream_ptr, merged_remote_execution_summaries); - } - else if (auto * cop_stream_ptr = dynamic_cast(stream_ptr.get())) - { - mergeRemoteExecuteSummaries(cop_stream_ptr, merged_remote_execution_summaries); - } - else - { - /// local read input stream - } - } - } - - auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams) { - ExecutionSummary current; - /// part 1: local execution info - for (const auto & stream_ptr : streams) - { - if (auto * p_stream = dynamic_cast(stream_ptr.get())) - { - current.time_processed_ns = std::max(current.time_processed_ns, p_stream->getProfileInfo().execution_time); - current.num_produced_rows += p_stream->getProfileInfo().rows; - current.num_iterations += p_stream->getProfileInfo().blocks; - } - current.concurrency++; - } - /// part 2: remote execution info - if (merged_remote_execution_summaries.find(executor_id) != merged_remote_execution_summaries.end()) - { - for (auto & remote : merged_remote_execution_summaries[executor_id]) - current.merge(remote, false); - } - /// part 3: for join need to add the build time - /// In TiFlash, a hash join's build side is finished before probe side starts, - /// so the join probe side's running time does not include hash table's build time, - /// when construct ExecSummaries, we need add the build cost to probe executor - auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id); - if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end()) - { - for (const auto & join_executor_id : all_join_id_it->second) - { - auto it = dag_context.getJoinExecuteInfoMap().find(join_executor_id); - if (it != dag_context.getJoinExecuteInfoMap().end()) - { - UInt64 process_time_for_build = 0; - for (const auto & join_build_stream : it->second.join_build_streams) - { - if (auto * p_stream = dynamic_cast(join_build_stream.get()); p_stream) - process_time_for_build = std::max(process_time_for_build, p_stream->getProfileInfo().execution_time); - } - current.time_processed_ns += process_time_for_build; - } - } - } - - current.time_processed_ns += dag_context.compile_time_ns; - fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode); - }; - - /// add execution_summary for local executor - if (dag_context.return_executor_id) - { - for (auto & p : dag_context.getProfileStreamsMap()) - fill_execution_summary(p.first, p.second); - } - else - { - const auto & profile_streams_map = dag_context.getProfileStreamsMap(); - assert(profile_streams_map.size() == dag_context.list_based_executors_order.size()); - for (const auto & executor_id : dag_context.list_based_executors_order) - { - auto it = profile_streams_map.find(executor_id); - assert(it != profile_streams_map.end()); - fill_execution_summary(executor_id, it->second); - } - } - - for (auto & p : merged_remote_execution_summaries) - { - if (local_executors.find(p.first) == local_executors.end()) - { - ExecutionSummary merged; - for (auto & remote : p.second) - merged.merge(remote, false); - fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first, delta_mode); - } - } -} - DAGResponseWriter::DAGResponseWriter( Int64 records_per_chunk_, DAGContext & dag_context_) : records_per_chunk(records_per_chunk_) + , summary_collector(dag_context_) , dag_context(dag_context_) { - for (auto & p : dag_context.getProfileStreamsMap()) - { - local_executors.insert(p.first); - } if (dag_context.encode_type == tipb::EncodeType::TypeCHBlock) { records_per_chunk = -1; diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h index fba7b011d85..b4ce67bffd0 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h @@ -14,41 +14,17 @@ #pragma once -#include -#include +#include #include namespace DB { -/// do not need be thread safe since it is only used in single thread env -struct ExecutionSummary -{ - UInt64 time_processed_ns; - UInt64 num_produced_rows; - UInt64 num_iterations; - UInt64 concurrency; - ExecutionSummary() - : time_processed_ns(0) - , num_produced_rows(0) - , num_iterations(0) - , concurrency(0) - {} - - void merge(const ExecutionSummary & other, bool streaming_call); -}; - class DAGResponseWriter { public: DAGResponseWriter( Int64 records_per_chunk_, DAGContext & dag_context_); - void fillTiExecutionSummary( - tipb::ExecutorExecutionSummary * execution_summary, - ExecutionSummary & current, - const String & executor_id, - bool delta_mode); - void addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode); virtual void write(const Block & block) = 0; virtual void finishWrite() = 0; virtual ~DAGResponseWriter() = default; @@ -56,9 +32,8 @@ class DAGResponseWriter protected: Int64 records_per_chunk; + ExecutionSummaryCollector summary_collector; DAGContext & dag_context; - std::unordered_map previous_execution_stats; - std::unordered_set local_executors; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp new file mode 100644 index 00000000000..190de9ee3d0 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp @@ -0,0 +1,37 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ + +void ExecutionSummary::merge(const ExecutionSummary & other, bool streaming_call) +{ + if (streaming_call) + { + time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); + num_produced_rows = std::max(num_produced_rows, other.num_produced_rows); + num_iterations = std::max(num_iterations, other.num_iterations); + concurrency = std::max(concurrency, other.concurrency); + } + else + { + time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); + num_produced_rows += other.num_produced_rows; + num_iterations += other.num_iterations; + concurrency += other.concurrency; + } +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummary.h b/dbms/src/Flash/Coprocessor/ExecutionSummary.h new file mode 100644 index 00000000000..2cd2b8635de --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ExecutionSummary.h @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +/// do not need be thread safe since it is only used in single thread env +struct ExecutionSummary +{ + UInt64 time_processed_ns = 0; + UInt64 num_produced_rows = 0; + UInt64 num_iterations = 0; + UInt64 concurrency = 0; + ExecutionSummary() = default; + + void merge(const ExecutionSummary & other, bool streaming_call); +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp new file mode 100644 index 00000000000..61bfb95c7c8 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp @@ -0,0 +1,168 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ + +/// delta_mode means when for a streaming call, return the delta execution summary +/// because TiDB is not aware of the streaming call when it handle the execution summaries +/// so we need to "pretend to be a unary call", can be removed if TiDB support streaming +/// call's execution summaries directly +void ExecutionSummaryCollector::fillTiExecutionSummary( + tipb::ExecutorExecutionSummary * execution_summary, + ExecutionSummary & current, + const String & executor_id, + bool delta_mode) +{ + auto & prev_stats = previous_execution_stats[executor_id]; + + execution_summary->set_time_processed_ns( + delta_mode ? current.time_processed_ns - prev_stats.time_processed_ns : current.time_processed_ns); + execution_summary->set_num_produced_rows( + delta_mode ? current.num_produced_rows - prev_stats.num_produced_rows : current.num_produced_rows); + execution_summary->set_num_iterations(delta_mode ? current.num_iterations - prev_stats.num_iterations : current.num_iterations); + execution_summary->set_concurrency(delta_mode ? current.concurrency - prev_stats.concurrency : current.concurrency); + prev_stats = current; + if (dag_context.return_executor_id) + execution_summary->set_executor_id(executor_id); +} + +template +void mergeRemoteExecuteSummaries( + RemoteBlockInputStream * input_stream, + std::unordered_map> & execution_summaries) +{ + size_t source_num = input_stream->getSourceNum(); + for (size_t s_index = 0; s_index < source_num; ++s_index) + { + auto remote_execution_summaries = input_stream->getRemoteExecutionSummaries(s_index); + if (remote_execution_summaries == nullptr) + continue; + bool is_streaming_call = input_stream->isStreamingCall(); + for (auto & p : *remote_execution_summaries) + { + if (execution_summaries[p.first].size() < source_num) + { + execution_summaries[p.first].resize(source_num); + } + execution_summaries[p.first][s_index].merge(p.second, is_streaming_call); + } + } +} + +void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode) +{ + if (!dag_context.collect_execution_summaries) + return; + /// get executionSummary info from remote input streams + std::unordered_map> merged_remote_execution_summaries; + for (const auto & map_entry : dag_context.getInBoundIOInputStreamsMap()) + { + for (const auto & stream_ptr : map_entry.second) + { + if (auto * exchange_receiver_stream_ptr = dynamic_cast(stream_ptr.get())) + { + mergeRemoteExecuteSummaries(exchange_receiver_stream_ptr, merged_remote_execution_summaries); + } + else if (auto * cop_stream_ptr = dynamic_cast(stream_ptr.get())) + { + mergeRemoteExecuteSummaries(cop_stream_ptr, merged_remote_execution_summaries); + } + else + { + /// local read input stream + } + } + } + + auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams) { + ExecutionSummary current; + /// part 1: local execution info + for (const auto & stream_ptr : streams) + { + if (auto * p_stream = dynamic_cast(stream_ptr.get())) + { + current.time_processed_ns = std::max(current.time_processed_ns, p_stream->getProfileInfo().execution_time); + current.num_produced_rows += p_stream->getProfileInfo().rows; + current.num_iterations += p_stream->getProfileInfo().blocks; + } + current.concurrency++; + } + /// part 2: remote execution info + if (merged_remote_execution_summaries.find(executor_id) != merged_remote_execution_summaries.end()) + { + for (auto & remote : merged_remote_execution_summaries[executor_id]) + current.merge(remote, false); + } + /// part 3: for join need to add the build time + /// In TiFlash, a hash join's build side is finished before probe side starts, + /// so the join probe side's running time does not include hash table's build time, + /// when construct ExecSummaries, we need add the build cost to probe executor + auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id); + if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end()) + { + for (const auto & join_executor_id : all_join_id_it->second) + { + auto it = dag_context.getJoinExecuteInfoMap().find(join_executor_id); + if (it != dag_context.getJoinExecuteInfoMap().end()) + { + UInt64 process_time_for_build = 0; + for (const auto & join_build_stream : it->second.join_build_streams) + { + if (auto * p_stream = dynamic_cast(join_build_stream.get()); p_stream) + process_time_for_build = std::max(process_time_for_build, p_stream->getProfileInfo().execution_time); + } + current.time_processed_ns += process_time_for_build; + } + } + } + + current.time_processed_ns += dag_context.compile_time_ns; + fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode); + }; + + /// add execution_summary for local executor + if (dag_context.return_executor_id) + { + for (auto & p : dag_context.getProfileStreamsMap()) + fill_execution_summary(p.first, p.second); + } + else + { + const auto & profile_streams_map = dag_context.getProfileStreamsMap(); + assert(profile_streams_map.size() == dag_context.list_based_executors_order.size()); + for (const auto & executor_id : dag_context.list_based_executors_order) + { + auto it = profile_streams_map.find(executor_id); + assert(it != profile_streams_map.end()); + fill_execution_summary(executor_id, it->second); + } + } + + for (auto & p : merged_remote_execution_summaries) + { + if (local_executors.find(p.first) == local_executors.end()) + { + ExecutionSummary merged; + for (auto & remote : p.second) + merged.merge(remote, false); + fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first, delta_mode); + } + } +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h new file mode 100644 index 00000000000..c537f3979e0 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h @@ -0,0 +1,49 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class ExecutionSummaryCollector +{ +public: + explicit ExecutionSummaryCollector( + DAGContext & dag_context_) + : dag_context(dag_context_) + { + for (auto & p : dag_context.getProfileStreamsMap()) + { + local_executors.insert(p.first); + } + } + + void addExecuteSummaries(tipb::SelectResponse & response, bool delta_mode); + +private: + void fillTiExecutionSummary( + tipb::ExecutorExecutionSummary * execution_summary, + ExecutionSummary & current, + const String & executor_id, + bool delta_mode); + +private: + DAGContext & dag_context; + std::unordered_map previous_execution_stats; + std::unordered_set local_executors; +}; +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index ab0852a1a59..73eddbf174a 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -21,8 +21,6 @@ #include #include -#include - namespace DB { namespace ErrorCodes @@ -100,7 +98,7 @@ void StreamingDAGResponseWriter::encodeThenWriteBlocks() { TrackedSelectResp response; if constexpr (send_exec_summary_at_last) - addExecuteSummaries(response.getResponse(), /*delta_mode=*/true); + summary_collector.addExecuteSummaries(response.getResponse(), /*delta_mode=*/true); response.setEncodeType(dag_context.encode_type); if (blocks.empty()) { diff --git a/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp index 886fb1410ae..04d6dc2fc4e 100644 --- a/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp @@ -51,7 +51,7 @@ UnaryDAGResponseWriter::UnaryDAGResponseWriter( void UnaryDAGResponseWriter::encodeChunkToDAGResponse() { - auto dag_chunk = dag_response->add_chunks(); + auto * dag_chunk = dag_response->add_chunks(); dag_chunk->set_rows_data(chunk_codec_stream->getString()); chunk_codec_stream->clear(); current_records_num = 0; @@ -63,7 +63,7 @@ void UnaryDAGResponseWriter::appendWarningsToDAGResponse() dag_context.consumeWarnings(warnings); for (auto & warning : warnings) { - auto warn = dag_response->add_warnings(); + auto * warn = dag_response->add_warnings(); // TODO: consider using allocated warnings to prevent copy? warn->CopyFrom(warning); } @@ -77,7 +77,7 @@ void UnaryDAGResponseWriter::finishWrite() encodeChunkToDAGResponse(); } appendWarningsToDAGResponse(); - addExecuteSummaries(*dag_response, false); + summary_collector.addExecuteSummaries(*dag_response, false); } void UnaryDAGResponseWriter::write(const Block & block) diff --git a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp index 69ec6f94a12..79e8f518daf 100644 --- a/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp +++ b/dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp @@ -73,7 +73,7 @@ void BroadcastOrPassThroughWriter::encodeThenWriteBlocks() if constexpr (send_exec_summary_at_last) { TrackedSelectResp response; - addExecuteSummaries(response.getResponse(), /*delta_mode=*/false); + summary_collector.addExecuteSummaries(response.getResponse(), /*delta_mode=*/false); tracked_packet.serializeByResponse(response.getResponse()); } if (blocks.empty()) diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index 4c21468d09d..ea63ee48ba7 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -146,7 +146,7 @@ void FineGrainedShuffleWriter::writePackets(std::vector::writePackets(std::vector