From ba882a33c46332e6bb190ce1a3d24c2b99ea7111 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 22 Dec 2022 15:50:55 +0800 Subject: [PATCH] refine remote execution summary (#6349) ref pingcap/tiflash#5900 --- .../DataStreams/TiRemoteBlockInputStream.h | 88 +-------- dbms/src/Flash/Coprocessor/DAGContext.h | 1 + .../Flash/Coprocessor/ExecutionSummary.cpp | 40 ++-- dbms/src/Flash/Coprocessor/ExecutionSummary.h | 7 +- .../Coprocessor/ExecutionSummaryCollector.cpp | 175 +++++++----------- .../Coprocessor/ExecutionSummaryCollector.h | 19 +- .../Coprocessor/RemoteExecutionSummary.cpp | 59 ++++++ .../Coprocessor/RemoteExecutionSummary.h | 33 ++++ .../gtest_ti_remote_block_inputstream.cpp | 9 +- .../Flash/tests/gtest_execution_summary.cpp | 140 ++++++++++++++ dbms/src/Storages/DeltaMerge/ScanContext.h | 11 ++ dbms/src/TestUtils/ExecutorTestUtils.cpp | 11 +- dbms/src/TestUtils/ExecutorTestUtils.h | 2 + 13 files changed, 379 insertions(+), 216 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/RemoteExecutionSummary.cpp create mode 100644 dbms/src/Flash/Coprocessor/RemoteExecutionSummary.h create mode 100644 dbms/src/Flash/tests/gtest_execution_summary.cpp diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index a0b90464dff..124f08d65c4 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -19,8 +19,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -50,15 +50,10 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream String name; - /// this atomic variable is kind of a lock for the struct of execution_summaries: - /// if execution_summaries_inited[index] = true, the map execution_summaries[index] - /// itself will not be modified, so ExecutionSummaryCollector can read it safely, otherwise, - /// ExecutionSummaryCollector will just skip execution_summaries[index] - std::vector> execution_summaries_inited; - std::vector> execution_summaries; - const LoggerPtr log; + RemoteExecutionSummary remote_execution_summary; + uint64_t total_rows; // For fine grained shuffle, sender will partition data into muiltiple streams by hashing. @@ -68,64 +63,6 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream std::unique_ptr decoder_ptr; - void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) - { - for (const auto & execution_summary : resp.execution_summaries()) - { - if (likely(execution_summary.has_executor_id())) - { - auto & remote_execution_summary = execution_summaries[index][execution_summary.executor_id()]; - remote_execution_summary.time_processed_ns = execution_summary.time_processed_ns(); - remote_execution_summary.num_produced_rows = execution_summary.num_produced_rows(); - remote_execution_summary.num_iterations = execution_summary.num_iterations(); - remote_execution_summary.concurrency = execution_summary.concurrency(); - DM::ScanContext scan_context; - scan_context.deserialize(execution_summary.tiflash_scan_context()); - remote_execution_summary.scan_context->merge(scan_context); - } - } - execution_summaries_inited[index].store(true); - } - - void addRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) - { - if (unlikely(resp.execution_summaries_size() == 0)) - return; - - if (!execution_summaries_inited[index].load()) - { - initRemoteExecutionSummaries(resp, index); - return; - } - if constexpr (is_streaming_reader) - throw Exception( - fmt::format( - "There are more than one execution summary packet of index {} in streaming reader, " - "this should not happen", - index)); - auto & execution_summaries_map = execution_summaries[index]; - for (const auto & execution_summary : resp.execution_summaries()) - { - if (likely(execution_summary.has_executor_id())) - { - const auto & executor_id = execution_summary.executor_id(); - if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end())) - { - LOG_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id); - continue; - } - auto & remote_execution_summary = execution_summaries_map[executor_id]; - remote_execution_summary.time_processed_ns = std::max(remote_execution_summary.time_processed_ns, execution_summary.time_processed_ns()); - remote_execution_summary.num_produced_rows += execution_summary.num_produced_rows(); - remote_execution_summary.num_iterations += execution_summary.num_iterations(); - remote_execution_summary.concurrency += execution_summary.concurrency(); - DM::ScanContext scan_context; - scan_context.deserialize(execution_summary.tiflash_scan_context()); - remote_execution_summary.scan_context->merge(scan_context); - } - } - } - bool fetchRemoteResult() { while (true) @@ -147,14 +84,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream throw Exception(result.resp->error().DebugString()); } - size_t index = 0; - if constexpr (is_streaming_reader) - index = result.call_index; - /// only the last response contains execution summaries if (result.resp != nullptr) - addRemoteExecutionSummaries(*result.resp, index); + remote_execution_summary.add(*result.resp); + size_t index = 0; + if constexpr (is_streaming_reader) + index = result.call_index; const auto & decode_detail = result.decode_detail; auto & connection_profile_info = connection_profile_infos[index]; connection_profile_info.packets += decode_detail.packets; @@ -179,16 +115,10 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream : remote_reader(remote_reader_) , source_num(remote_reader->getSourceNum()) , name(fmt::format("TiRemote({})", RemoteReader::name)) - , execution_summaries_inited(source_num) , log(Logger::get(name, req_id, executor_id)) , total_rows(0) , stream_id(stream_id_) { - for (size_t i = 0; i < source_num; ++i) - { - execution_summaries_inited[i].store(false); - } - execution_summaries.resize(source_num); connection_profile_infos.resize(source_num); sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema()))); static constexpr size_t squash_rows_limit = 8192; @@ -228,9 +158,9 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream return block; } - const std::unordered_map * getRemoteExecutionSummaries(size_t index) + const RemoteExecutionSummary & getRemoteExecutionSummary() { - return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr; + return remote_execution_summary; } size_t getTotalRows() const { return total_rows; } diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index aaf218ba24e..ce8b93e92d2 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -193,6 +193,7 @@ class DAGContext , dummy_query_string(dag_request->DebugString()) , dummy_ast(makeDummyQuery()) , initialize_concurrency(concurrency) + , collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries()) , is_mpp_task(true) , is_root_mpp_task(false) , log(Logger::get(log_identifier)) diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp index a62693f1aec..818d0edfbea 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp +++ b/dbms/src/Flash/Coprocessor/ExecutionSummary.cpp @@ -16,24 +16,30 @@ namespace DB { +void ExecutionSummary::merge(const ExecutionSummary & other) +{ + time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); + num_produced_rows += other.num_produced_rows; + num_iterations += other.num_iterations; + concurrency += other.concurrency; + scan_context->merge(*other.scan_context); +} + +void ExecutionSummary::merge(const tipb::ExecutorExecutionSummary & other) +{ + time_processed_ns = std::max(time_processed_ns, other.time_processed_ns()); + num_produced_rows += other.num_produced_rows(); + num_iterations += other.num_iterations(); + concurrency += other.concurrency(); + scan_context->merge(other.tiflash_scan_context()); +} -void ExecutionSummary::merge(const ExecutionSummary & other, bool streaming_call) +void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other) { - if (streaming_call) - { - time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); - num_produced_rows = std::max(num_produced_rows, other.num_produced_rows); - num_iterations = std::max(num_iterations, other.num_iterations); - concurrency = std::max(concurrency, other.concurrency); - scan_context->merge(*other.scan_context); - } - else - { - time_processed_ns = std::max(time_processed_ns, other.time_processed_ns); - num_produced_rows += other.num_produced_rows; - num_iterations += other.num_iterations; - concurrency += other.concurrency; - scan_context->merge(*other.scan_context); - } + time_processed_ns = other.time_processed_ns(); + num_produced_rows = other.num_produced_rows(); + num_iterations = other.num_iterations(); + concurrency = other.concurrency(); + scan_context->deserialize(other.tiflash_scan_context()); } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummary.h b/dbms/src/Flash/Coprocessor/ExecutionSummary.h index eafeaeed292..5a8ce579a6e 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummary.h +++ b/dbms/src/Flash/Coprocessor/ExecutionSummary.h @@ -16,6 +16,7 @@ #include #include +#include #include @@ -29,11 +30,13 @@ struct ExecutionSummary UInt64 num_iterations = 0; UInt64 concurrency = 0; - std::unique_ptr scan_context = std::make_unique(); + DM::ScanContextPtr scan_context = std::make_shared(); ExecutionSummary() = default; - void merge(const ExecutionSummary & other, bool streaming_call); + void merge(const ExecutionSummary & other); + void merge(const tipb::ExecutorExecutionSummary & other); + void init(const tipb::ExecutorExecutionSummary & other); }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp index c21c839760c..86122e400b2 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.cpp @@ -14,14 +14,31 @@ #include #include +#include #include -#include -#include - -#include +#include namespace DB { +namespace +{ +RemoteExecutionSummary getRemoteExecutionSummariesFromExchange(DAGContext & dag_context) +{ + RemoteExecutionSummary exchange_execution_summary; + for (const auto & map_entry : dag_context.getInBoundIOInputStreamsMap()) + { + for (const auto & stream_ptr : map_entry.second) + { + if (auto * exchange_receiver_stream_ptr = dynamic_cast(stream_ptr.get()); exchange_receiver_stream_ptr) + { + exchange_execution_summary.merge(exchange_receiver_stream_ptr->getRemoteExecutionSummary()); + } + } + } + return exchange_execution_summary; +} +} // namespace + void ExecutionSummaryCollector::fillTiExecutionSummary( tipb::ExecutorExecutionSummary * execution_summary, ExecutionSummary & current, @@ -37,29 +54,6 @@ void ExecutionSummaryCollector::fillTiExecutionSummary( execution_summary->set_executor_id(executor_id); } -template -void mergeRemoteExecuteSummaries( - RemoteBlockInputStream * input_stream, - std::unordered_map> & execution_summaries) -{ - size_t source_num = input_stream->getSourceNum(); - for (size_t s_index = 0; s_index < source_num; ++s_index) - { - auto remote_execution_summaries = input_stream->getRemoteExecutionSummaries(s_index); - if (remote_execution_summaries == nullptr) - continue; - bool is_streaming_call = input_stream->isStreamingCall(); - for (auto & p : *remote_execution_summaries) - { - if (execution_summaries[p.first].size() < source_num) - { - execution_summaries[p.first].resize(source_num); - } - execution_summaries[p.first][s_index].merge(p.second, is_streaming_call); - } - } -} - tipb::SelectResponse ExecutionSummaryCollector::genExecutionSummaryResponse() { tipb::SelectResponse response; @@ -67,89 +61,67 @@ tipb::SelectResponse ExecutionSummaryCollector::genExecutionSummaryResponse() return response; } -void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response) +void ExecutionSummaryCollector::fillLocalExecutionSummary( + tipb::SelectResponse & response, + const String & executor_id, + const BlockInputStreams & streams, + const std::unordered_map & scan_context_map) const { - if (!dag_context.collect_execution_summaries) - return; - /// get executionSummary info from remote input streams - std::unordered_map> merged_remote_execution_summaries; - for (const auto & map_entry : dag_context.getInBoundIOInputStreamsMap()) + ExecutionSummary current; + /// part 1: local execution info + // get execution info from streams + for (const auto & stream_ptr : streams) { - for (const auto & stream_ptr : map_entry.second) + if (auto * p_stream = dynamic_cast(stream_ptr.get())) { - if (auto * exchange_receiver_stream_ptr = dynamic_cast(stream_ptr.get())) - { - mergeRemoteExecuteSummaries(exchange_receiver_stream_ptr, merged_remote_execution_summaries); - } - else if (auto * cop_stream_ptr = dynamic_cast(stream_ptr.get())) - { - mergeRemoteExecuteSummaries(cop_stream_ptr, merged_remote_execution_summaries); - } - else - { - /// local read input stream - } + current.time_processed_ns = std::max(current.time_processed_ns, p_stream->getProfileInfo().execution_time); + current.num_produced_rows += p_stream->getProfileInfo().rows; + current.num_iterations += p_stream->getProfileInfo().blocks; } + ++current.concurrency; } - - auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams, const std::unordered_map & scan_context_map) { - ExecutionSummary current; - /// part 1: local execution info - // get execution info from streams - for (const auto & stream_ptr : streams) - { - if (auto * p_stream = dynamic_cast(stream_ptr.get())) - { - current.time_processed_ns = std::max(current.time_processed_ns, p_stream->getProfileInfo().execution_time); - current.num_produced_rows += p_stream->getProfileInfo().rows; - current.num_iterations += p_stream->getProfileInfo().blocks; - } - current.concurrency++; - } - // get execution info from scan_context - if (const auto & iter = scan_context_map.find(executor_id); iter != scan_context_map.end()) - { - current.scan_context->merge(*(iter->second)); - } - - /// part 2: remote execution info - if (merged_remote_execution_summaries.find(executor_id) != merged_remote_execution_summaries.end()) - { - for (auto & remote : merged_remote_execution_summaries[executor_id]) - current.merge(remote, false); - } - /// part 3: for join need to add the build time - /// In TiFlash, a hash join's build side is finished before probe side starts, - /// so the join probe side's running time does not include hash table's build time, - /// when construct ExecSummaries, we need add the build cost to probe executor - auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id); - if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end()) + // get execution info from scan_context + if (const auto & iter = scan_context_map.find(executor_id); iter != scan_context_map.end()) + { + current.scan_context->merge(*(iter->second)); + } + /// part 2: for join need to add the build time + /// In TiFlash, a hash join's build side is finished before probe side starts, + /// so the join probe side's running time does not include hash table's build time, + /// when construct ExecSummaries, we need add the build cost to probe executor + auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id); + if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end()) + { + for (const auto & join_executor_id : all_join_id_it->second) { - for (const auto & join_executor_id : all_join_id_it->second) + auto it = dag_context.getJoinExecuteInfoMap().find(join_executor_id); + if (it != dag_context.getJoinExecuteInfoMap().end()) { - auto it = dag_context.getJoinExecuteInfoMap().find(join_executor_id); - if (it != dag_context.getJoinExecuteInfoMap().end()) + UInt64 process_time_for_build = 0; + for (const auto & join_build_stream : it->second.join_build_streams) { - UInt64 process_time_for_build = 0; - for (const auto & join_build_stream : it->second.join_build_streams) - { - if (auto * p_stream = dynamic_cast(join_build_stream.get()); p_stream) - process_time_for_build = std::max(process_time_for_build, p_stream->getProfileInfo().execution_time); - } - current.time_processed_ns += process_time_for_build; + if (auto * p_stream = dynamic_cast(join_build_stream.get()); p_stream) + process_time_for_build = std::max(process_time_for_build, p_stream->getProfileInfo().execution_time); } + current.time_processed_ns += process_time_for_build; } } + } - current.time_processed_ns += dag_context.compile_time_ns; - fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id); - }; + current.time_processed_ns += dag_context.compile_time_ns; + fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id); +} - /// add execution_summary for local executor +void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & response) +{ + if (!dag_context.collect_execution_summaries) + return; + + /// fill execution_summary for local executor if (dag_context.return_executor_id) { for (auto & p : dag_context.getProfileStreamsMap()) - fill_execution_summary(p.first, p.second, dag_context.scan_context_map); + fillLocalExecutionSummary(response, p.first, p.second, dag_context.scan_context_map); } else { @@ -159,19 +131,16 @@ void ExecutionSummaryCollector::addExecuteSummaries(tipb::SelectResponse & respo { auto it = profile_streams_map.find(executor_id); assert(it != profile_streams_map.end()); - fill_execution_summary(executor_id, it->second, dag_context.scan_context_map); + fillLocalExecutionSummary(response, executor_id, it->second, dag_context.scan_context_map); } } - for (auto & p : merged_remote_execution_summaries) + // TODO support cop remote read and disaggregated mode. + auto exchange_execution_summary = getRemoteExecutionSummariesFromExchange(dag_context); + // fill execution_summary to reponse for remote executor received by exchange. + for (auto & p : exchange_execution_summary.execution_summaries) { - if (local_executors.find(p.first) == local_executors.end()) - { - ExecutionSummary merged; - for (auto & remote : p.second) - merged.merge(remote, false); - fillTiExecutionSummary(response.add_execution_summaries(), merged, p.first); - } + fillTiExecutionSummary(response.add_execution_summaries(), p.second, p.first); } } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h index dedd488d125..dc5a64e723b 100644 --- a/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h +++ b/dbms/src/Flash/Coprocessor/ExecutionSummaryCollector.h @@ -14,23 +14,21 @@ #pragma once -#include +#include #include +#include namespace DB { +class DAGContext; + class ExecutionSummaryCollector { public: explicit ExecutionSummaryCollector( DAGContext & dag_context_) : dag_context(dag_context_) - { - for (auto & p : dag_context.getProfileStreamsMap()) - { - local_executors.insert(p.first); - } - } + {} void addExecuteSummaries(tipb::SelectResponse & response); @@ -42,8 +40,13 @@ class ExecutionSummaryCollector ExecutionSummary & current, const String & executor_id) const; + void fillLocalExecutionSummary( + tipb::SelectResponse & response, + const String & executor_id, + const BlockInputStreams & streams, + const std::unordered_map & scan_context_map) const; + private: DAGContext & dag_context; - std::unordered_set local_executors; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/RemoteExecutionSummary.cpp b/dbms/src/Flash/Coprocessor/RemoteExecutionSummary.cpp new file mode 100644 index 00000000000..fc88afcf700 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/RemoteExecutionSummary.cpp @@ -0,0 +1,59 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +void RemoteExecutionSummary::merge(const RemoteExecutionSummary & other) +{ + for (const auto & p : other.execution_summaries) + { + const auto & executor_id = p.first; + auto it = execution_summaries.find(executor_id); + if (unlikely(it == execution_summaries.end())) + { + execution_summaries[executor_id] = p.second; + } + else + { + it->second.merge(p.second); + } + } +} + +void RemoteExecutionSummary::add(tipb::SelectResponse & resp) +{ + if (unlikely(resp.execution_summaries_size() == 0)) + return; + + for (const auto & execution_summary : resp.execution_summaries()) + { + if (likely(execution_summary.has_executor_id())) + { + const auto & executor_id = execution_summary.executor_id(); + auto it = execution_summaries.find(executor_id); + if (unlikely(it == execution_summaries.end())) + { + execution_summaries[executor_id].init(execution_summary); + } + else + { + it->second.merge(execution_summary); + } + } + } +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/RemoteExecutionSummary.h b/dbms/src/Flash/Coprocessor/RemoteExecutionSummary.h new file mode 100644 index 00000000000..dd2a9d0b5bf --- /dev/null +++ b/dbms/src/Flash/Coprocessor/RemoteExecutionSummary.h @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include + +namespace DB +{ +struct RemoteExecutionSummary +{ + void merge(const RemoteExecutionSummary & other); + + void add(tipb::SelectResponse & resp); + + // + std::unordered_map execution_summaries; +}; +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 2434feeba26..0162b940ce4 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -398,11 +398,10 @@ class TestTiRemoteBlockInputStream : public testing::Test { assert(receiver_stream); /// Check Execution Summary - const auto * summary = receiver_stream->getRemoteExecutionSummaries(0); - ASSERT_TRUE(summary != nullptr); - ASSERT_EQ(summary->size(), 1); - ASSERT_EQ(summary->begin()->first, "Executor_0"); - ASSERT_TRUE(equalSummaries(writer->mockExecutionSummary(), summary->begin()->second)); + const auto & summary = receiver_stream->getRemoteExecutionSummary(); + ASSERT_EQ(summary.execution_summaries.size(), 1); + ASSERT_EQ(summary.execution_summaries.begin()->first, "Executor_0"); + ASSERT_TRUE(equalSummaries(writer->mockExecutionSummary(), summary.execution_summaries.begin()->second)); /// Check Connection Info auto infos = receiver_stream->getConnectionProfileInfos(); diff --git a/dbms/src/Flash/tests/gtest_execution_summary.cpp b/dbms/src/Flash/tests/gtest_execution_summary.cpp new file mode 100644 index 00000000000..e010cfb5a53 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_execution_summary.cpp @@ -0,0 +1,140 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace tests +{ +class ExecutionSummaryTestRunner : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + context.addMockTable({"test_db", "test_table"}, + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana", "banana", {}, "banana", "banana", {}, "banana", "banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana", "apple", {}, "banana", "apple", {}, "banana", "apple", {}, "banana"})}); + context.addExchangeReceiver("test_exchange", + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana", "banana", {}, "banana", "banana", {}, "banana", "banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana", "apple", {}, "banana", "apple", {}, "banana", "apple", {}, "banana"})}); + } + + static constexpr size_t concurrency = 10; + static constexpr int not_check_rows = -1; + // + using ProfileInfo = std::pair; + using Expect = std::unordered_map; + void testForExecutionSummary( + const std::shared_ptr & request, + const Expect & expect) + { + request->set_collect_execution_summaries(true); + DAGContext dag_context(*request, "test_execution_summary", concurrency); + executeStreams(&dag_context); + ASSERT_EQ(dag_context.getProfileStreamsMap().size(), expect.size()); + ASSERT_TRUE(dag_context.collect_execution_summaries); + ExecutionSummaryCollector summary_collector(dag_context); + auto summaries = summary_collector.genExecutionSummaryResponse().execution_summaries(); + ASSERT_EQ(summaries.size(), expect.size()); + for (const auto & summary : summaries) + { + ASSERT_TRUE(summary.has_executor_id()); + auto it = expect.find(summary.executor_id()); + ASSERT_TRUE(it != expect.end()) << fmt::format("unknown executor_id: {}", summary.executor_id()); + if (it->second.first != not_check_rows) + ASSERT_EQ(summary.num_produced_rows(), it->second.first) << fmt::format("executor_id: {}", summary.executor_id()); + ASSERT_EQ(summary.concurrency(), it->second.second) << fmt::format("executor_id: {}", summary.executor_id()); + // time_processed_ns, num_iterations and tiflash_scan_context are not checked here. + } + } +}; + +TEST_F(ExecutionSummaryTestRunner, test) +try +{ + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .build(context); + Expect expect{{"table_scan_0", {12, concurrency}}, {"selection_1", {4, concurrency}}}; + testForExecutionSummary(request, expect); + } + { + auto request = context + .scan("test_db", "test_table") + .limit(5) + .build(context); + Expect expect{{"table_scan_0", {not_check_rows, concurrency}}, {"limit_1", {5, 1}}}; + testForExecutionSummary(request, expect); + } + { + auto request = context + .scan("test_db", "test_table") + .topN("s1", true, 5) + .build(context); + Expect expect{{"table_scan_0", {not_check_rows, concurrency}}, {"topn_1", {5, 1}}}; + testForExecutionSummary(request, expect); + } + { + auto request = context + .scan("test_db", "test_table") + .project({col("s2")}) + .build(context); + Expect expect{{"table_scan_0", {12, concurrency}}, {"project_1", {12, concurrency}}}; + testForExecutionSummary(request, expect); + } + { + auto request = context + .scan("test_db", "test_table") + .aggregation({col("s2")}, {col("s2")}) + .build(context); + Expect expect{{"table_scan_0", {12, concurrency}}, {"aggregation_1", {3, concurrency}}}; + testForExecutionSummary(request, expect); + } + { + auto t1 = context.scan("test_db", "test_table"); + auto t2 = context.scan("test_db", "test_table"); + auto request = t1.join(t2, tipb::JoinType::TypeInnerJoin, {col("s1")}).build(context); + Expect expect{{"table_scan_0", {12, concurrency}}, {"table_scan_1", {12, concurrency}}, {"Join_2", {64, concurrency}}}; + testForExecutionSummary(request, expect); + } + { + auto request = context + .receive("test_exchange") + .exchangeSender(tipb::Hash) + .build(context); + Expect expect{{"exchange_receiver_0", {12, concurrency}}, {"exchange_sender_1", {12, concurrency}}}; + testForExecutionSummary(request, expect); + } + { + auto request = context + .receive("test_exchange") + .sort({{"s1", false}, {"s2", false}, {"s1", false}, {"s2", false}}, true) + .window(RowNumber(), {"s1", false}, {"s2", false}, buildDefaultRowsFrame()) + .build(context); + Expect expect{{"exchange_receiver_0", {12, concurrency}}, {"sort_1", {12, 1}}, {"window_2", {12, 1}}}; + testForExecutionSummary(request, expect); + } +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index 590223caef6..63d2081092c 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -81,6 +81,17 @@ class ScanContext total_dmfile_read_time_ms += other.total_dmfile_read_time_ms; total_create_snapshot_time_ms += other.total_create_snapshot_time_ms; } + + void merge(const tipb::TiFlashScanContext & other) + { + total_dmfile_scanned_packs += other.total_dmfile_scanned_packs(); + total_dmfile_skipped_packs += other.total_dmfile_skipped_packs(); + total_dmfile_scanned_rows += other.total_dmfile_scanned_rows(); + total_dmfile_skipped_rows += other.total_dmfile_skipped_rows(); + total_dmfile_rough_set_index_load_time_ms += other.total_dmfile_rough_set_index_load_time_ms(); + total_dmfile_read_time_ms += other.total_dmfile_read_time_ms(); + total_create_snapshot_time_ms += other.total_create_snapshot_time_ms(); + } }; using ScanContextPtr = std::shared_ptr; diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 90fa363fe4d..dc279f3ea7f 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -222,12 +222,19 @@ void ExecutorTest::enablePlanner(bool is_enable) context.context.setSetting("enable_planner", is_enable ? "true" : "false"); } -DB::ColumnsWithTypeAndName ExecutorTest::executeStreams(const std::shared_ptr & request, size_t concurrency) +DB::ColumnsWithTypeAndName ExecutorTest::executeStreams( + const std::shared_ptr & request, + size_t concurrency) { DAGContext dag_context(*request, "executor_test", concurrency); + return executeStreams(&dag_context); +} + +ColumnsWithTypeAndName ExecutorTest::executeStreams(DAGContext * dag_context) +{ context.context.setExecutorTest(); context.context.setMockStorage(context.mockStorage()); - context.context.setDAGContext(&dag_context); + context.context.setDAGContext(dag_context); // Currently, don't care about regions information in tests. Blocks blocks; queryExecute(context.context, /*internal=*/true)->execute([&blocks](const Block & block) { blocks.push_back(block); }).verify(); diff --git a/dbms/src/TestUtils/ExecutorTestUtils.h b/dbms/src/TestUtils/ExecutorTestUtils.h index acb239a96df..ee014e4b069 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.h +++ b/dbms/src/TestUtils/ExecutorTestUtils.h @@ -94,6 +94,8 @@ class ExecutorTest : public ::testing::Test } } + ColumnsWithTypeAndName executeStreams(DAGContext * dag_context); + ColumnsWithTypeAndName executeStreams( const std::shared_ptr & request, size_t concurrency = 1);