diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index f700b8fbe2f..49ea60e1943 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -58,15 +57,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, TablesRegionsInfo && tab , warning_count(0) , keyspace_id(keyspace_id_) { - RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors()); - const auto & root_executor = isTreeBasedExecutors() - ? dag_request->root_executor() - : dag_request->executors(dag_request->executors_size() - 1); - return_executor_id = root_executor.has_executor_id(); - if (return_executor_id) - root_executor_id = root_executor.executor_id(); initOutputInfo(); - initListBasedExecutors(); } // for mpp @@ -75,7 +66,6 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me , dummy_query_string(dag_request->DebugString()) , dummy_ast(makeDummyQuery()) , collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries()) - , return_executor_id(true) , is_mpp_task(true) , is_root_mpp_task(is_root_mpp_task_) , flags(dag_request->flags()) @@ -87,12 +77,9 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me , warning_count(0) , keyspace_id(RequestUtils::deriveKeyspaceID(meta_)) { - RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id()); - root_executor_id = dag_request->root_executor().executor_id(); // only mpp task has join executor. initExecutorIdToJoinIdMap(); initOutputInfo(); - initListBasedExecutors(); } // for disaggregated task on write node @@ -116,11 +103,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const disaggregated::Dis , warning_count(0) , keyspace_id(RequestUtils::deriveKeyspaceID(task_meta_)) { - RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id()); - return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id(); - initOutputInfo(); - initListBasedExecutors(); } // for test @@ -153,15 +136,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, s , warnings(max_recorded_error_count) , warning_count(0) { - RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors()); - const auto & root_executor = isTreeBasedExecutors() - ? dag_request->root_executor() - : dag_request->executors(dag_request->executors_size() - 1); - return_executor_id = root_executor.has_executor_id(); - if (return_executor_id) - root_executor_id = root_executor.executor_id(); initOutputInfo(); - initListBasedExecutors(); } void DAGContext::initOutputInfo() @@ -182,29 +157,6 @@ void DAGContext::initOutputInfo() keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock; } - -void DAGContext::initListBasedExecutors() -{ - if (!isTreeBasedExecutors()) - { - ExecutorIdGenerator id_generator; - for (int i = 0; i < dag_request->executors_size(); ++i) - { - auto * executor = dag_request->mutable_executors(i); - const auto & executor_id = id_generator.generate(*executor); - list_based_executors_order.push_back(executor_id); - // Set executor_id for list based executor, - // then we can fill executor_id for Execution Summaries of list-based executors - executor->set_executor_id(executor_id); - } - } -} - -bool DAGContext::isTreeBasedExecutors() const -{ - return dag_request->has_root_executor(); -} - bool DAGContext::allowZeroInDate() const { return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE; @@ -240,7 +192,7 @@ void DAGContext::initExecutorIdToJoinIdMap() return; executor_id_to_join_id_map.clear(); - traverseExecutorsReverse(dag_request, [&](const tipb::Executor & executor) { + dag_request.traverseReverse([&](const tipb::Executor & executor) { std::vector all_join_id; // for mpp, dag_request.has_root_executor() == true, can call `getChildren` directly. getChildren(executor).forEach([&](const tipb::Executor & child) { diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 05d7708c27d..a330dd3be50 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -20,13 +20,13 @@ #pragma GCC diagnostic ignored "-Wdeprecated-declarations" #endif #include -#include #pragma GCC diagnostic pop #include #include #include #include +#include #include #include #include @@ -274,7 +274,7 @@ class DAGContext KeyspaceID getKeyspaceID() const { return keyspace_id; } - tipb::DAGRequest * dag_request; + DAGRequest dag_request; /// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast, /// dummy_query_string and dummy_ast is used for that String dummy_query_string; @@ -291,8 +291,6 @@ class DAGContext // For disaggregated read, this is the host of compute node String tidb_host = "Unknown"; bool collect_execution_summaries{}; - bool return_executor_id{}; - String root_executor_id; /* const */ bool is_mpp_task = false; /* const */ bool is_root_mpp_task = false; /* const */ bool is_batch_cop = false; @@ -313,10 +311,6 @@ class DAGContext std::vector output_field_types; std::vector output_offsets; - /// Hold the order of list based executors. - /// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors. - std::vector list_based_executors_order; - /// executor_id, ScanContextPtr /// Currently, max(scan_context_map.size()) == 1, because one mpp task only have do one table scan /// While when we support collcate join later, scan_context_map.size() may > 1, @@ -326,8 +320,6 @@ class DAGContext private: void initExecutorIdToJoinIdMap(); void initOutputInfo(); - void initListBasedExecutors(); - bool isTreeBasedExecutors() const; private: std::shared_ptr process_list_entry; diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index d7d1b9625ab..a14e48ef8f9 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -26,15 +26,15 @@ namespace DB DAGQuerySource::DAGQuerySource(Context & context_) : context(context_) { - const tipb::DAGRequest & dag_request = *getDAGContext().dag_request; - if (dag_request.has_root_executor()) + const auto & dag_request = getDAGContext().dag_request; + if (dag_request.isTreeBased()) { QueryBlockIDGenerator id_generator; - root_query_block = std::make_shared(dag_request.root_executor(), id_generator); + root_query_block = std::make_shared(dag_request->root_executor(), id_generator); } else { - root_query_block = std::make_shared(1, dag_request.executors()); + root_query_block = std::make_shared(1, dag_request->executors()); } } diff --git a/dbms/src/Flash/Coprocessor/DAGRequest.cpp b/dbms/src/Flash/Coprocessor/DAGRequest.cpp new file mode 100644 index 00000000000..cecbb090aff --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGRequest.cpp @@ -0,0 +1,173 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace +{ +void check(bool condition, const String & err_msg) +{ + if unlikely (!condition) + throw TiFlashException(err_msg, Errors::Coprocessor::BadRequest); +} + +class ListBasedExecutorIdGenerator +{ +public: + explicit ListBasedExecutorIdGenerator(tipb::DAGRequest * dag_request) + { + assert(dag_request->executors_size() > 0); + // To check duplicate executor_id for list based request. + // Normally list based requests do not set the executor id, but here it is just in case. + for (int i = 0; i < dag_request->executors_size(); ++i) + { + const auto & executor = dag_request->executors(i); + if (executor.has_executor_id()) + { + const auto & executor_id = executor.executor_id(); + check(executor_id_set.find(executor_id) == executor_id_set.end(), fmt::format("in list based request, executor id `{}` duplicate, which is unexpected.", executor_id)); + executor_id_set.insert(executor_id); + } + } + } + + String generate(const tipb::Executor & executor) + { + // Has checked in constructor. + if (executor.has_executor_id()) + return executor.executor_id(); + + for (size_t i = 0; i < 5; ++i) + { + auto gen_id = doGenerate(executor); + if (executor_id_set.find(gen_id) == executor_id_set.end()) + { + executor_id_set.insert(gen_id); + return gen_id; + } + } + throw Exception(fmt::format("We have failed five times to generate a unique id for list base executor, exists ids are: [{}]", fmt::join(executor_id_set, ","))); + } + +private: + String doGenerate(const tipb::Executor & executor) + { + assert(!executor.has_executor_id()); + switch (executor.tp()) + { + case tipb::ExecType::TypeSelection: + return fmt::format("selection_{}", current_id++); + case tipb::ExecType::TypeProjection: + return fmt::format("project_{}", current_id++); + case tipb::ExecType::TypeStreamAgg: + case tipb::ExecType::TypeAggregation: + return fmt::format("aggregation_{}", current_id++); + case tipb::ExecType::TypeTopN: + return fmt::format("topn_{}", current_id++); + case tipb::ExecType::TypeLimit: + return fmt::format("limit_{}", current_id++); + case tipb::ExecType::TypeExchangeSender: + return fmt::format("exchange_sender_{}", current_id++); + case tipb::ExecType::TypeExchangeReceiver: + return fmt::format("exchange_receiver_{}", current_id++); + case tipb::ExecType::TypeTableScan: + case tipb::ExecType::TypePartitionTableScan: + return fmt::format("table_scan_{}", current_id++); + case tipb::ExecType::TypeSort: + return fmt::format("sort_{}", current_id++); + case tipb::ExecType::TypeWindow: + return fmt::format("window_{}", current_id++); + case tipb::ExecType::TypeJoin: + return fmt::format("join_{}", current_id++); + case tipb::ExecType::TypeExpand: + return fmt::format("expand_{}", current_id++); + default: + throw TiFlashException( + fmt::format("Unsupported executor in list based DAG request: {}", executor.DebugString()), + Errors::Coprocessor::Unimplemented); + } + } + +private: + UInt32 current_id = 0; + std::unordered_set executor_id_set; +}; +} // namespace + +DAGRequest::DAGRequest(tipb::DAGRequest * dag_request_) + : dag_request(dag_request_) +{ + // Will only occur in tests. + if unlikely (!dag_request) + return; + + check((dag_request->executors_size() > 0) != dag_request->has_root_executor(), "dagrequest must be one of list based and tree based"); + is_tree_based = dag_request->has_root_executor(); + + checkOrSetExecutorId(); +} + +void DAGRequest::checkOrSetExecutorId() +{ + if (is_tree_based) + { + // check duplicate executor_id for tree based request. + std::unordered_set ids; + traverseExecutorTree(dag_request->root_executor(), [&](const tipb::Executor & executor) { + check(executor.has_executor_id(), "for tree based request, executor id cannot be null"); + const auto & executor_id = executor.executor_id(); + check(ids.find(executor_id) == ids.end(), fmt::format("in tree based request, executor id `{}` duplicate, which is unexpected.", executor_id)); + ids.insert(executor_id); + return true; + }); + } + else + { + // generate executor_id for list based request. + ListBasedExecutorIdGenerator id_generator{dag_request}; + for (int i = 0; i < dag_request->executors_size(); ++i) + { + auto * executor = dag_request->mutable_executors(i); + const auto & executor_id = id_generator.generate(*executor); + // Set executor_id for list based executor, + // then we can fill executor_id for Execution Summaries of list-based executors + executor->set_executor_id(executor_id); + // The begin of list_based_executors_order is the leaf node like table scan. + list_based_executors_order.push_back(executor_id); + } + } +} + +const tipb::Executor & DAGRequest::rootExecutor() const +{ + check(dag_request, "dagrequest can't be null"); + return isTreeBased() ? dag_request->root_executor() : dag_request->executors(dag_request->executors_size() - 1); +} + +void DAGRequest::traverse(std::function && func) const +{ + check(dag_request, "dagrequest can't be null"); + traverseExecutors(dag_request, std::move(func)); +} + +void DAGRequest::traverseReverse(std::function && func) const +{ + check(dag_request, "dagrequest can't be null"); + traverseExecutorsReverse(dag_request, std::move(func)); +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGRequest.h b/dbms/src/Flash/Coprocessor/DAGRequest.h new file mode 100644 index 00000000000..8924722a7e0 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGRequest.h @@ -0,0 +1,61 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include + +namespace DB +{ +// TODO move more requrest related codes from DAGContext to here. +class DAGRequest +{ +public: + explicit DAGRequest(tipb::DAGRequest * dag_request_); + + const tipb::DAGRequest * operator()() const { return dag_request; } + + const tipb::DAGRequest * operator->() const { return dag_request; } + + const tipb::DAGRequest & operator*() const { return *dag_request; } + + bool isTreeBased() const { return is_tree_based; } + + const tipb::Executor & rootExecutor() const; + + /// traverse tipb::executor of DAGRequest and apply function. + /// func: (const tipb::Executor &) -> bool, return true to continue traverse. + void traverse(std::function && func) const; + + /// traverse tipb::executor of DAGRequest in reverse order and apply function. + /// func: (const tipb::Executor &). + void traverseReverse(std::function && func) const; + +private: + void checkOrSetExecutorId(); + +public: + tipb::DAGRequest * dag_request; + + bool is_tree_based{false}; + + /// Hold the order of list based executors. + /// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors. + std::vector list_based_executors_order; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 8080d7e0f41..31a1b8c3e72 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -391,10 +391,7 @@ class TestTiRemoteBlockInputStream : public testing::Test // 3. send execution summary writer->add_summary = true; - - ExecutorStatisticsCollector statistics_collector(""); - statistics_collector.initialize(dag_context_ptr.get()); - auto summary_response = statistics_collector.genExecutionSummaryResponse(); + tipb::SelectResponse summary_response; writer->write(summary_response); } @@ -420,9 +417,7 @@ class TestTiRemoteBlockInputStream : public testing::Test // 3. send execution summary writer->add_summary = true; - ExecutorStatisticsCollector statistics_collector(""); - statistics_collector.initialize(dag_context_ptr.get()); - auto execution_summary_response = statistics_collector.genExecutionSummaryResponse(); + tipb::SelectResponse execution_summary_response; writer->write(execution_summary_response); } diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 4561922eaee..5752159f618 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -159,7 +158,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) { auto tunnel_set_local = std::make_shared(log->identifier()); std::chrono::seconds timeout(task_request.timeout()); - const auto & exchange_sender = dag_req.root_executor().exchange_sender(); + const auto & exchange_sender = dag_context->dag_request.rootExecutor().exchange_sender(); for (int i = 0; i < exchange_sender.encoded_task_meta_size(); ++i) { @@ -192,7 +191,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) void MPPTask::initExchangeReceivers() { auto receiver_set_local = std::make_shared(log->identifier()); - traverseExecutors(&dag_req, [&](const tipb::Executor & executor) { + dag_context->dag_request.traverse([&](const tipb::Executor & executor) { if (executor.tp() == tipb::ExecType::TypeExchangeReceiver) { assert(executor.has_executor_id()); diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 13786383bac..302b64fff87 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -109,6 +109,7 @@ class MPPTask : public std::enable_shared_from_this void initExchangeReceivers(); + // To make sure dag_req is not destroyed before the mpp task ends. tipb::DAGRequest dag_req; mpp::TaskMeta meta; MPPTaskId id; diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp index 3f0268892e8..f131ff9b817 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp @@ -69,9 +69,9 @@ void MPPTaskStatistics::initializeExecutorDAG(DAGContext * dag_context_) assert(dag_context_); assert(dag_context_->isMPPTask()); dag_context = dag_context_; - RUNTIME_CHECK(dag_context->dag_request && dag_context->dag_request->has_root_executor()); - const auto & root_executor = dag_context->dag_request->root_executor(); - RUNTIME_CHECK(root_executor.has_exchange_sender()); + const auto & root_executor = dag_context->dag_request.rootExecutor(); + if unlikely (!root_executor.has_exchange_sender()) + throw TiFlashException("The root executor isn't ExchangeSender in MPP, which is unexpected.", Errors::Coprocessor::BadRequest); is_root = dag_context->isRootMPPTask(); sender_executor_id = root_executor.executor_id(); diff --git a/dbms/src/Flash/Planner/ExecutorIdGenerator.h b/dbms/src/Flash/Planner/ExecutorIdGenerator.h deleted file mode 100644 index bd0a74aad46..00000000000 --- a/dbms/src/Flash/Planner/ExecutorIdGenerator.h +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include - -namespace DB -{ -class ExecutorIdGenerator -{ -public: - String generate(const tipb::Executor & executor) - { - String executor_id = executor.has_executor_id() ? executor.executor_id() : doGenerate(executor); - assert(!executor_id.empty()); - RUNTIME_CHECK(ids.find(executor_id) == ids.end(), executor_id); - ids.insert(executor_id); - return executor_id; - } - -private: - String doGenerate(const tipb::Executor & executor) - { - assert(!executor.has_executor_id()); - switch (executor.tp()) - { - case tipb::ExecType::TypeSelection: - return fmt::format("selection_{}", current_id++); - case tipb::ExecType::TypeProjection: - return fmt::format("project_{}", current_id++); - case tipb::ExecType::TypeStreamAgg: - case tipb::ExecType::TypeAggregation: - return fmt::format("aggregation_{}", current_id++); - case tipb::ExecType::TypeTopN: - return fmt::format("topn_{}", current_id++); - case tipb::ExecType::TypeLimit: - return fmt::format("limit_{}", current_id++); - case tipb::ExecType::TypeExchangeSender: - return fmt::format("exchange_sender_{}", current_id++); - case tipb::ExecType::TypeExchangeReceiver: - return fmt::format("exchange_receiver_{}", current_id++); - case tipb::ExecType::TypeTableScan: - case tipb::ExecType::TypePartitionTableScan: - return fmt::format("table_scan_{}", current_id++); - case tipb::ExecType::TypeSort: - return fmt::format("sort_{}", current_id++); - case tipb::ExecType::TypeWindow: - return fmt::format("window_{}", current_id++); - case tipb::ExecType::TypeJoin: - return fmt::format("join_{}", current_id++); - default: - throw TiFlashException( - fmt::format("Unsupported executor in DAG request: {}", executor.DebugString()), - Errors::Planner::Unimplemented); - } - } - - UInt32 current_id = 0; - - std::unordered_set ids; -}; -} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index ec789c72497..b47d847ca2b 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp b/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp index 170ca3ff8b8..0af9a203eba 100644 --- a/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp +++ b/dbms/src/Flash/Statistics/ExecutionSummaryHelper.cpp @@ -32,7 +32,7 @@ void fillTiExecutionSummary( // tree-based executors will have executor_id. // In ut, list-based executor will have executor_id for result comparision. - if (dag_context.return_executor_id || force_fill_executor_id) + if (dag_context.dag_request.isTreeBased() || force_fill_executor_id) execution_summary->set_executor_id(executor_id); } } // namespace DB diff --git a/dbms/src/Flash/Statistics/ExecutorStatistics.h b/dbms/src/Flash/Statistics/ExecutorStatistics.h index f22e6eeeefd..94a0f6d549e 100644 --- a/dbms/src/Flash/Statistics/ExecutorStatistics.h +++ b/dbms/src/Flash/Statistics/ExecutorStatistics.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -40,7 +39,7 @@ class ExecutorStatistics : public ExecutorStatisticsBase ExecutorStatistics(const tipb::Executor * executor, DAGContext & dag_context_) : dag_context(dag_context_) { - RUNTIME_CHECK(executor->has_executor_id()); + assert(executor->has_executor_id()); executor_id = executor->executor_id(); type = ExecutorImpl::type; diff --git a/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp b/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp index ec7270c01a0..7c3f1b07a5d 100644 --- a/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp +++ b/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp @@ -24,6 +24,7 @@ #include #include #include + namespace DB { namespace @@ -65,70 +66,58 @@ void ExecutorStatisticsCollector::initialize(DAGContext * dag_context_) dag_context = dag_context_; assert(dag_context); - if likely (dag_context->dag_request) - { - traverseExecutors(dag_context->dag_request, [&](const tipb::Executor & executor) { - RUNTIME_CHECK(executor.has_executor_id()); - if (!append< - AggStatistics, - ExchangeReceiverStatistics, - ExchangeSenderStatistics, - FilterStatistics, - JoinStatistics, - LimitStatistics, - ProjectStatistics, - SortStatistics, - TableScanStatistics, - TopNStatistics, - WindowStatistics, - ExpandStatistics>(&executor)) - { - throw TiFlashException( - fmt::format("Unknown executor type, executor_id: {}", executor.executor_id()), - Errors::Coprocessor::Internal); - } - return true; - }); - - fillListBasedExecutorsChild(); - fillTreeBasedExecutorsChildren(); - } -} - -void ExecutorStatisticsCollector::fillListBasedExecutorsChild() -{ - if (dag_context->dag_request->executors_size() > 0) - { - // fill list-based executors child - auto size = dag_context->dag_request->executors_size(); - RUNTIME_CHECK(size > 0); - const auto & executors = dag_context->dag_request->executors(); - String child; - for (int i = 0; i < size; ++i) + dag_context->dag_request.traverse([&](const tipb::Executor & executor) { + assert(executor.has_executor_id()); + if (!append< + AggStatistics, + ExchangeReceiverStatistics, + ExchangeSenderStatistics, + FilterStatistics, + JoinStatistics, + LimitStatistics, + ProjectStatistics, + SortStatistics, + TableScanStatistics, + TopNStatistics, + WindowStatistics, + ExpandStatistics>(&executor)) { - const auto & executor_id = executors[i].executor_id(); - if (i != 0) - profiles[executor_id]->setChild(child); - child = executor_id; + throw TiFlashException( + fmt::format("Unknown executor type, executor_id: {}", executor.executor_id()), + Errors::Coprocessor::Internal); } - } + return true; + }); + + fillChildren(); } -void ExecutorStatisticsCollector::fillTreeBasedExecutorsChildren() +void ExecutorStatisticsCollector::fillChildren() { - if (dag_context->dag_request->has_root_executor()) + if (dag_context->dag_request.isTreeBased()) { - traverseExecutors(dag_context->dag_request, [&](const tipb::Executor & executor) { - // set children for tree-based executors + // set children for tree-based executors + dag_context->dag_request.traverse([&](const tipb::Executor & executor) { std::vector children; getChildren(executor).forEach([&](const tipb::Executor & child) { - RUNTIME_CHECK(child.has_executor_id()); + assert(child.has_executor_id()); children.push_back(child.executor_id()); }); profiles[executor.executor_id()]->setChildren(children); return true; }); } + else + { + // fill list-based executors child + std::optional child; + for (const auto & executor_id : dag_context->dag_request.list_based_executors_order) + { + if (child) + profiles[executor_id]->setChild(*child); + child = executor_id; + } + } } tipb::SelectResponse ExecutorStatisticsCollector::genExecutionSummaryResponse() @@ -178,7 +167,7 @@ void ExecutorStatisticsCollector::collectRuntimeDetails() void ExecutorStatisticsCollector::fillLocalExecutionSummaries(tipb::SelectResponse & response) { - if (dag_context->return_executor_id) + if (dag_context->dag_request.isTreeBased()) { // fill in tree-based executors' execution summary for (auto & p : profiles) @@ -192,8 +181,8 @@ void ExecutorStatisticsCollector::fillLocalExecutionSummaries(tipb::SelectRespon else { // fill in list-based executors' execution summary - RUNTIME_CHECK(profiles.size() == dag_context->list_based_executors_order.size()); - for (const auto & executor_id : dag_context->list_based_executors_order) + RUNTIME_CHECK(profiles.size() == dag_context->dag_request.list_based_executors_order.size()); + for (const auto & executor_id : dag_context->dag_request.list_based_executors_order) { auto it = profiles.find(executor_id); RUNTIME_CHECK(it != profiles.end()); diff --git a/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.h b/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.h index e0d29fc7ad1..6f4fa1d2ced 100644 --- a/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.h +++ b/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.h @@ -59,9 +59,7 @@ class ExecutorStatisticsCollector UInt64 join_build_time, const std::unordered_map & scan_context_map) const; - void fillListBasedExecutorsChild(); - void fillTreeBasedExecutorsChildren(); - + void fillChildren(); template bool appendImpl(const tipb::Executor * executor) @@ -77,8 +75,8 @@ class ExecutorStatisticsCollector template bool append(const tipb::Executor * executor) { - RUNTIME_CHECK(executor->has_executor_id()); - RUNTIME_CHECK(profiles.find(executor->executor_id()) == profiles.end()); + assert(executor->has_executor_id()); + assert(profiles.find(executor->executor_id()) == profiles.end()); return (appendImpl(executor) || ...); } diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp index b4ac4bb6d69..f61494e51d7 100644 --- a/dbms/src/Flash/executeQuery.cpp +++ b/dbms/src/Flash/executeQuery.cpp @@ -137,7 +137,7 @@ std::optional executeAsPipeline(Context & context, bool intern FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); PhysicalPlan physical_plan{context, logger->identifier()}; - physical_plan.build(dag_context.dag_request); + physical_plan.build(dag_context.dag_request()); physical_plan.outputAndOptimize(); auto pipeline = physical_plan.toPipeline(); auto executor = std::make_unique(memory_tracker, context, logger->identifier(), pipeline); diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index baed730eea1..82ed8e7bb34 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -157,7 +157,7 @@ RequestAndRegionIDs StorageDisaggregated::buildDispatchMPPTaskRequest( std::vector region_ids = RequestUtils::setUpRegionInfos(batch_cop_task, dispatch_req); const auto & sender_target_task_meta = context.getDAGContext()->getMPPTaskMeta(); - const auto * dag_req = context.getDAGContext()->dag_request; + const auto * dag_req = context.getDAGContext()->dag_request(); tipb::DAGRequest sender_dag_req; sender_dag_req.set_time_zone_name(dag_req->time_zone_name()); sender_dag_req.set_time_zone_offset(dag_req->time_zone_offset()); diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 8a9d2fce068..8de77ea9bcc 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -308,7 +308,7 @@ StorageDisaggregated::buildDisaggregatedTaskForNode( { // Setup the encoded plan - const auto * dag_req = context.getDAGContext()->dag_request; + const auto * dag_req = context.getDAGContext()->dag_request(); tipb::DAGRequest table_scan_req; table_scan_req.set_time_zone_name(dag_req->time_zone_name()); table_scan_req.set_time_zone_offset(dag_req->time_zone_offset());