From 3e1690a0c7cbb0f3c626f0497591f624bacfd627 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 1 Aug 2023 11:08:37 +0800 Subject: [PATCH 1/5] This is an automated cherry-pick of #7854 Signed-off-by: ti-chi-bot --- dbms/src/Flash/executeQuery.cpp | 189 +++++++++++++++++++++++++ dbms/src/Interpreters/executeQuery.cpp | 62 ++++++++ dbms/src/Interpreters/executeQuery.h | 6 + 3 files changed, 257 insertions(+) create mode 100644 dbms/src/Flash/executeQuery.cpp diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp new file mode 100644 index 00000000000..4b708a0131e --- /dev/null +++ b/dbms/src/Flash/executeQuery.cpp @@ -0,0 +1,189 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ProfileEvents +{ +extern const Event Query; +} + +namespace DB +{ +namespace FailPoints +{ +extern const char random_interpreter_failpoint[]; +} // namespace FailPoints + +namespace +{ +void prepareForExecute(Context & context) +{ + ProfileEvents::increment(ProfileEvents::Query); + context.setQueryContext(context); + + QuotaForIntervals & quota = context.getQuota(); + quota.addQuery(); /// NOTE Seems that when new time interval has come, first query is not accounted in number of queries. + quota.checkExceeded(time(nullptr)); +} + +ProcessList::EntryPtr getProcessListEntry(Context & context, DAGContext & dag_context) +{ + if (dag_context.is_mpp_task) + { + /// for MPPTask, process list entry is set in MPPTask::initProcessListEntry() + RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr"); + return dag_context.getProcessListEntry(); + } + else + { + RUNTIME_ASSERT(dag_context.getProcessListEntry() == nullptr, "process list entry for non-MPP must be nullptr"); + auto process_list_entry = setProcessListElement( + context, + dag_context.dummy_query_string, + dag_context.dummy_ast.get(), + true); + dag_context.setProcessListEntry(process_list_entry); + return process_list_entry; + } +} + +QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool internal) +{ + RUNTIME_ASSERT(context.getDAGContext()); + auto & dag_context = *context.getDAGContext(); + const auto & logger = dag_context.log; + RUNTIME_ASSERT(logger); + + prepareForExecute(context); + + ProcessList::EntryPtr process_list_entry; + if (likely(!internal)) + { + process_list_entry = getProcessListEntry(context, dag_context); + logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger); + } + + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); + auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete); + BlockIO res = interpreter->execute(); + MemoryTrackerPtr memory_tracker; + if (likely(process_list_entry)) + { + (*process_list_entry)->setQueryStreams(res); + memory_tracker = (*process_list_entry)->getMemoryTrackerPtr(); + } + + /// Hold element of process list till end of query execution. + res.process_list_entry = process_list_entry; + + if (likely(!internal)) + logQueryPipeline(logger, res.in); + + dag_context.switchToStreamMode(); + return std::make_unique(memory_tracker, context, logger->identifier(), res.in); +} + +std::optional executeAsPipeline(Context & context, bool internal) +{ + RUNTIME_ASSERT(context.getDAGContext()); + auto & dag_context = *context.getDAGContext(); + const auto & logger = dag_context.log; + RUNTIME_ASSERT(logger); + + if unlikely (!TaskScheduler::instance) + { + LOG_WARNING(logger, "The task scheduler of the pipeline model has not been initialized, which is an exception. It is necessary to restart the TiFlash node."); + return {}; + } + if (!Pipeline::isSupported(*dag_context.dag_request, context.getSettingsRef())) + { + LOG_DEBUG(logger, "Can't executed by pipeline model due to unsupported operator, and then fallback to block inputstream model"); + return {}; + } + + prepareForExecute(context); + + ProcessList::EntryPtr process_list_entry; + if (likely(!internal)) + { + process_list_entry = getProcessListEntry(context, dag_context); + logQuery(dag_context.dummy_query_string, context, logger); + } + + MemoryTrackerPtr memory_tracker; + if (likely(process_list_entry)) + memory_tracker = (*process_list_entry)->getMemoryTrackerPtr(); + + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); + auto executor = std::make_unique(memory_tracker, context, logger->identifier()); + if (likely(!internal)) + LOG_INFO(logger, fmt::format("Query pipeline:\n{}", executor->toString())); + dag_context.switchToPipelineMode(); + return {std::move(executor)}; +} + +QueryExecutorPtr executeAsBlockIO(Context & context, bool internal) +{ + if (context.getSettingsRef().enable_planner) + { + PlanQuerySource plan(context); + return doExecuteAsBlockIO(plan, context, internal); + } + else + { + DAGQuerySource dag(context); + return doExecuteAsBlockIO(dag, context, internal); + } +} +} // namespace + +QueryExecutorPtr queryExecute(Context & context, bool internal) +{ + if (context.getSettingsRef().enforce_enable_pipeline) + { + RUNTIME_CHECK_MSG( + TaskScheduler::instance, + "The task scheduler of the pipeline model has not been initialized, which is an exception. It is necessary to restart the TiFlash node."); + auto res = executeAsPipeline(context, internal); + RUNTIME_CHECK_MSG(res, "Failed to execute query using pipeline model, and an error is reported because the setting enforce_enable_pipeline is true."); + return std::move(*res); + } + if (context.getSettingsRef().enable_planner + && context.getSettingsRef().enable_pipeline) + { + if (auto res = executeAsPipeline(context, internal); res) + return std::move(*res); + } + return executeAsBlockIO(context, internal); +} +} // namespace DB diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 6c96e7c22ad..81928beddfa 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -158,6 +158,18 @@ void onExceptionBeforeStart(const String & query, Context & context, time_t curr } } +void prepareForInputStream( + Context & context, + const BlockInputStreamPtr & in) +{ + assert(in); + if (auto * stream = dynamic_cast(in.get())) + { + stream->setProgressCallback(context.getProgressCallback()); + stream->setProcessListElement(context.getProcessListElement()); + } +} + std::tuple executeQueryImpl( IQuerySource & query_src, Context & context, @@ -408,6 +420,56 @@ std::tuple executeQueryImpl( } } // namespace +<<<<<<< HEAD +======= +/// Log query into text log (not into system table). +void logQuery(const String & query, const Context & context, const LoggerPtr & logger) +{ + const auto & current_query_id = context.getClientInfo().current_query_id; + const auto & initial_query_id = context.getClientInfo().initial_query_id; + const auto & current_user = context.getClientInfo().current_user; + + LOG_DEBUG( + logger, + "(from {}{}, query_id: {}{}) {}", + context.getClientInfo().current_address.toString(), + (current_user != "default" ? ", user: " + current_user : ""), + current_query_id, + (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""), + joinLines(query)); +} + +std::shared_ptr setProcessListElement( + Context & context, + const String & query, + const IAST * ast, + bool is_dag_task) +{ + assert(ast); + auto total_memory = context.getServerInfo().has_value() ? context.getServerInfo()->memory_info.capacity : 0; + auto process_list_entry = context.getProcessList().insert( + query, + ast, + context.getClientInfo(), + context.getSettingsRef(), + total_memory, + is_dag_task); + context.setProcessListElement(&process_list_entry->get()); + return process_list_entry; +} + +void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in) +{ + assert(in); + auto pipeline_log_str = [&in]() { + FmtBuffer log_buffer; + log_buffer.append("Query pipeline:\n"); + in->dumpTree(log_buffer); + return log_buffer.toString(); + }; + LOG_INFO(logger, pipeline_log_str()); +} +>>>>>>> 80f8e9dc65 (fix issue #7810 (#7854)) BlockIO executeQuery( const String & query, diff --git a/dbms/src/Interpreters/executeQuery.h b/dbms/src/Interpreters/executeQuery.h index ea5d70a7a91..e28eb4ae7df 100644 --- a/dbms/src/Interpreters/executeQuery.h +++ b/dbms/src/Interpreters/executeQuery.h @@ -53,6 +53,12 @@ BlockIO executeQuery( ); +<<<<<<< HEAD BlockIO executeQuery(DAGQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage); +======= +void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in); + +void logQuery(const String & query, const Context & context, const LoggerPtr & logger); +>>>>>>> 80f8e9dc65 (fix issue #7810 (#7854)) } // namespace DB From 24ba39e99e8729876a7bd9715b2f118cc39cc0eb Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 4 Dec 2023 14:04:45 +0800 Subject: [PATCH 2/5] Delete dbms/src/Flash/executeQuery.cpp --- dbms/src/Flash/executeQuery.cpp | 189 -------------------------------- 1 file changed, 189 deletions(-) delete mode 100644 dbms/src/Flash/executeQuery.cpp diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp deleted file mode 100644 index 4b708a0131e..00000000000 --- a/dbms/src/Flash/executeQuery.cpp +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ProfileEvents -{ -extern const Event Query; -} - -namespace DB -{ -namespace FailPoints -{ -extern const char random_interpreter_failpoint[]; -} // namespace FailPoints - -namespace -{ -void prepareForExecute(Context & context) -{ - ProfileEvents::increment(ProfileEvents::Query); - context.setQueryContext(context); - - QuotaForIntervals & quota = context.getQuota(); - quota.addQuery(); /// NOTE Seems that when new time interval has come, first query is not accounted in number of queries. - quota.checkExceeded(time(nullptr)); -} - -ProcessList::EntryPtr getProcessListEntry(Context & context, DAGContext & dag_context) -{ - if (dag_context.is_mpp_task) - { - /// for MPPTask, process list entry is set in MPPTask::initProcessListEntry() - RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr"); - return dag_context.getProcessListEntry(); - } - else - { - RUNTIME_ASSERT(dag_context.getProcessListEntry() == nullptr, "process list entry for non-MPP must be nullptr"); - auto process_list_entry = setProcessListElement( - context, - dag_context.dummy_query_string, - dag_context.dummy_ast.get(), - true); - dag_context.setProcessListEntry(process_list_entry); - return process_list_entry; - } -} - -QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool internal) -{ - RUNTIME_ASSERT(context.getDAGContext()); - auto & dag_context = *context.getDAGContext(); - const auto & logger = dag_context.log; - RUNTIME_ASSERT(logger); - - prepareForExecute(context); - - ProcessList::EntryPtr process_list_entry; - if (likely(!internal)) - { - process_list_entry = getProcessListEntry(context, dag_context); - logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger); - } - - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); - auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete); - BlockIO res = interpreter->execute(); - MemoryTrackerPtr memory_tracker; - if (likely(process_list_entry)) - { - (*process_list_entry)->setQueryStreams(res); - memory_tracker = (*process_list_entry)->getMemoryTrackerPtr(); - } - - /// Hold element of process list till end of query execution. - res.process_list_entry = process_list_entry; - - if (likely(!internal)) - logQueryPipeline(logger, res.in); - - dag_context.switchToStreamMode(); - return std::make_unique(memory_tracker, context, logger->identifier(), res.in); -} - -std::optional executeAsPipeline(Context & context, bool internal) -{ - RUNTIME_ASSERT(context.getDAGContext()); - auto & dag_context = *context.getDAGContext(); - const auto & logger = dag_context.log; - RUNTIME_ASSERT(logger); - - if unlikely (!TaskScheduler::instance) - { - LOG_WARNING(logger, "The task scheduler of the pipeline model has not been initialized, which is an exception. It is necessary to restart the TiFlash node."); - return {}; - } - if (!Pipeline::isSupported(*dag_context.dag_request, context.getSettingsRef())) - { - LOG_DEBUG(logger, "Can't executed by pipeline model due to unsupported operator, and then fallback to block inputstream model"); - return {}; - } - - prepareForExecute(context); - - ProcessList::EntryPtr process_list_entry; - if (likely(!internal)) - { - process_list_entry = getProcessListEntry(context, dag_context); - logQuery(dag_context.dummy_query_string, context, logger); - } - - MemoryTrackerPtr memory_tracker; - if (likely(process_list_entry)) - memory_tracker = (*process_list_entry)->getMemoryTrackerPtr(); - - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); - auto executor = std::make_unique(memory_tracker, context, logger->identifier()); - if (likely(!internal)) - LOG_INFO(logger, fmt::format("Query pipeline:\n{}", executor->toString())); - dag_context.switchToPipelineMode(); - return {std::move(executor)}; -} - -QueryExecutorPtr executeAsBlockIO(Context & context, bool internal) -{ - if (context.getSettingsRef().enable_planner) - { - PlanQuerySource plan(context); - return doExecuteAsBlockIO(plan, context, internal); - } - else - { - DAGQuerySource dag(context); - return doExecuteAsBlockIO(dag, context, internal); - } -} -} // namespace - -QueryExecutorPtr queryExecute(Context & context, bool internal) -{ - if (context.getSettingsRef().enforce_enable_pipeline) - { - RUNTIME_CHECK_MSG( - TaskScheduler::instance, - "The task scheduler of the pipeline model has not been initialized, which is an exception. It is necessary to restart the TiFlash node."); - auto res = executeAsPipeline(context, internal); - RUNTIME_CHECK_MSG(res, "Failed to execute query using pipeline model, and an error is reported because the setting enforce_enable_pipeline is true."); - return std::move(*res); - } - if (context.getSettingsRef().enable_planner - && context.getSettingsRef().enable_pipeline) - { - if (auto res = executeAsPipeline(context, internal); res) - return std::move(*res); - } - return executeAsBlockIO(context, internal); -} -} // namespace DB From 718d06b31b4177f63c76aec64a7c6e04f8c9da91 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 4 Dec 2023 14:05:30 +0800 Subject: [PATCH 3/5] Update executeQuery.h --- dbms/src/Interpreters/executeQuery.h | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dbms/src/Interpreters/executeQuery.h b/dbms/src/Interpreters/executeQuery.h index e28eb4ae7df..ea5d70a7a91 100644 --- a/dbms/src/Interpreters/executeQuery.h +++ b/dbms/src/Interpreters/executeQuery.h @@ -53,12 +53,6 @@ BlockIO executeQuery( ); -<<<<<<< HEAD BlockIO executeQuery(DAGQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage); -======= -void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in); - -void logQuery(const String & query, const Context & context, const LoggerPtr & logger); ->>>>>>> 80f8e9dc65 (fix issue #7810 (#7854)) } // namespace DB From adb6c87afd9ab111b28d082d05dd24bb0dd17cce Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 4 Dec 2023 14:07:35 +0800 Subject: [PATCH 4/5] Update executeQuery.cpp --- dbms/src/Interpreters/executeQuery.cpp | 63 -------------------------- 1 file changed, 63 deletions(-) diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 81928beddfa..37d4f92b45f 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -158,18 +158,6 @@ void onExceptionBeforeStart(const String & query, Context & context, time_t curr } } -void prepareForInputStream( - Context & context, - const BlockInputStreamPtr & in) -{ - assert(in); - if (auto * stream = dynamic_cast(in.get())) - { - stream->setProgressCallback(context.getProgressCallback()); - stream->setProcessListElement(context.getProcessListElement()); - } -} - std::tuple executeQueryImpl( IQuerySource & query_src, Context & context, @@ -420,57 +408,6 @@ std::tuple executeQueryImpl( } } // namespace -<<<<<<< HEAD -======= -/// Log query into text log (not into system table). -void logQuery(const String & query, const Context & context, const LoggerPtr & logger) -{ - const auto & current_query_id = context.getClientInfo().current_query_id; - const auto & initial_query_id = context.getClientInfo().initial_query_id; - const auto & current_user = context.getClientInfo().current_user; - - LOG_DEBUG( - logger, - "(from {}{}, query_id: {}{}) {}", - context.getClientInfo().current_address.toString(), - (current_user != "default" ? ", user: " + current_user : ""), - current_query_id, - (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""), - joinLines(query)); -} - -std::shared_ptr setProcessListElement( - Context & context, - const String & query, - const IAST * ast, - bool is_dag_task) -{ - assert(ast); - auto total_memory = context.getServerInfo().has_value() ? context.getServerInfo()->memory_info.capacity : 0; - auto process_list_entry = context.getProcessList().insert( - query, - ast, - context.getClientInfo(), - context.getSettingsRef(), - total_memory, - is_dag_task); - context.setProcessListElement(&process_list_entry->get()); - return process_list_entry; -} - -void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in) -{ - assert(in); - auto pipeline_log_str = [&in]() { - FmtBuffer log_buffer; - log_buffer.append("Query pipeline:\n"); - in->dumpTree(log_buffer); - return log_buffer.toString(); - }; - LOG_INFO(logger, pipeline_log_str()); -} ->>>>>>> 80f8e9dc65 (fix issue #7810 (#7854)) - BlockIO executeQuery( const String & query, Context & context, From 1432578f7836c037dce887c01246852e44c48223 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 4 Dec 2023 14:12:15 +0800 Subject: [PATCH 5/5] Update executeQuery.cpp --- dbms/src/Interpreters/executeQuery.cpp | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 37d4f92b45f..a8715aa7e9a 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -240,19 +240,22 @@ std::tuple executeQueryImpl( { if (auto * stream = dynamic_cast(res.in.get())) { - stream->setProgressCallback(context.getProgressCallback()); - stream->setProcessListElement(context.getProcessListElement()); - - /// Limits on the result, the quota on the result, and also callback for progress. - /// Limits apply only to the final result. - if (stage == QueryProcessingStage::Complete) + if (auto * dag_src = dynamic_cast(&query_src); dag_src == nullptr) { - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT; - limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); + stream->setProgressCallback(context.getProgressCallback()); + stream->setProcessListElement(context.getProcessListElement()); + + /// Limits on the result, the quota on the result, and also callback for progress. + /// Limits apply only to the final result. + if (stage == QueryProcessingStage::Complete) + { + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT; + limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); - stream->setLimits(limits); - stream->setQuota(quota); + stream->setLimits(limits); + stream->setQuota(quota); + } } } } @@ -408,6 +411,7 @@ std::tuple executeQueryImpl( } } // namespace + BlockIO executeQuery( const String & query, Context & context,