From 32919d18dcd242ecfafceeca9490014a9883e91a Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 27 Sep 2022 16:41:43 +0800 Subject: [PATCH 1/4] update --- dbms/src/Flash/Coprocessor/DAGDriver.cpp | 2 +- dbms/src/Flash/executeQuery.cpp | 81 +++++++++++- dbms/src/Flash/executeQuery.h | 7 +- dbms/src/Interpreters/Context.cpp | 5 - dbms/src/Interpreters/Context.h | 1 - dbms/src/Interpreters/executeQuery.cpp | 159 +++++++++++------------ dbms/src/Interpreters/executeQuery.h | 13 +- dbms/src/TestUtils/ExecutorTestUtils.cpp | 4 +- 8 files changed, 165 insertions(+), 107 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 5df5c95c6e0..28f2f12b6f2 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -90,7 +90,7 @@ try auto start_time = Clock::now(); DAGContext & dag_context = *context.getDAGContext(); - BlockIO streams = executeQuery(context, internal, QueryProcessingStage::Complete); + BlockIO streams = executeQuery(context, internal); if (!streams.in || streams.out) // Only query is allowed, so streams.in must not be null and streams.out must be null throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal); diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp index 903409c4118..2596b2ebc31 100644 --- a/dbms/src/Flash/executeQuery.cpp +++ b/dbms/src/Flash/executeQuery.cpp @@ -12,27 +12,96 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include #include #include +#include +#include +#include #include +namespace ProfileEvents +{ +extern const Event Query; +} + namespace DB { -BlockIO executeQuery( - Context & context, - bool internal, - QueryProcessingStage::Enum stage) +namespace FailPoints +{ +extern const char random_interpreter_failpoint[]; +} // namespace FailPoints +namespace +{ +void prepareForExecute(Context & context) +{ + ProfileEvents::increment(ProfileEvents::Query); + context.setQueryContext(context); + + QuotaForIntervals & quota = context.getQuota(); + quota.addQuery(); /// NOTE Seems that when new time interval has come, first query is not accounted in number of queries. + quota.checkExceeded(time(nullptr)); +} + +BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal) +{ + RUNTIME_ASSERT(context.getDAGContext()); + auto & dag_context = *context.getDAGContext(); + const auto & logger = dag_context.log; + RUNTIME_ASSERT(logger); + + prepareForExecute(context); + + if (!internal) + logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); + auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete); + BlockIO res = interpreter->execute(); + + // For internal == true, no need to set up the resource usage tracker. + if (!internal) + { + /// Hold element of process list till end of query execution. + if (dag_context.is_mpp_task) + { + /// for MPPTask, process list entry is created in MPPTask::prepare() + RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr"); + res.process_list_entry = dag_context.getProcessListEntry(); + } + else + { + RUNTIME_ASSERT(dag_context.getProcessListEntry() == nullptr, "process list entry for non-MPP must be nullptr"); + res.process_list_entry = setProcessListElement( + context, + dag_context.dummy_query_string, + dag_context.dummy_ast.get()); + } + (*res.process_list_entry)->setQueryStreams(res); + } + + prepareForInputStream(context, QueryProcessingStage::Complete, res.in); + if (!internal) + logQueryPipeline(logger, res.in); + + dag_context.attachBlockIO(res); + return res; +} +} // namespace + +BlockIO executeQuery(Context & context, bool internal) { if (context.getSettingsRef().enable_planner) { PlanQuerySource plan(context); - return executeQuery(plan, context, internal, stage); + return executeDAG(plan, context, internal); } else { DAGQuerySource dag(context); - return executeQuery(dag, context, internal, stage); + return executeDAG(dag, context, internal); } } } // namespace DB diff --git a/dbms/src/Flash/executeQuery.h b/dbms/src/Flash/executeQuery.h index a8b56f765d0..0494ad5b24d 100644 --- a/dbms/src/Flash/executeQuery.h +++ b/dbms/src/Flash/executeQuery.h @@ -20,8 +20,5 @@ namespace DB { -BlockIO executeQuery( - Context & context, - bool internal = false, - QueryProcessingStage::Enum stage = QueryProcessingStage::Complete); -} +BlockIO executeQuery(Context & context, bool internal = false); +} // namespace DB diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index dcb1e41202e..3cb6cb1d5ad 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1223,11 +1223,6 @@ DAGContext * Context::getDAGContext() const return dag_context; } -bool Context::isMPPTask() const -{ - return dag_context != nullptr && dag_context->is_mpp_task; -} - void Context::setUncompressedCache(size_t max_size_in_bytes) { auto lock = getLock(); diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 190017b87bf..9b845371dc1 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -344,7 +344,6 @@ class Context void setDAGContext(DAGContext * dag_context); DAGContext * getDAGContext() const; - bool isMPPTask() const; /// List all queries. ProcessList & getProcessList(); diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 03a04f50fa5..d80800deee4 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -22,8 +22,6 @@ #include #include #include -#include -#include #include #include #include @@ -87,23 +85,6 @@ LoggerPtr getLogger(const Context & context) : Logger::get("executeQuery"); } -/// Log query into text log (not into system table). -void logQuery(const String & query, const Context & context, const LoggerPtr & logger) -{ - const auto & current_query_id = context.getClientInfo().current_query_id; - const auto & initial_query_id = context.getClientInfo().initial_query_id; - const auto & current_user = context.getClientInfo().current_user; - - LOG_FMT_DEBUG( - logger, - "(from {}{}, query_id: {}{}) {}", - context.getClientInfo().current_address.toString(), - (current_user != "default" ? ", user: " + current_user : ""), - current_query_id, - (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""), - joinLines(query)); -} - /// Call this inside catch block. void setExceptionStackTrace(QueryLogElement & elem) @@ -222,41 +203,9 @@ std::tuple executeQueryImpl( /// Put query to process list. But don't put SHOW PROCESSLIST query itself. ProcessList::EntryPtr process_list_entry; - if (!internal && !context.isMPPTask() && nullptr == typeid_cast(&*ast)) + if (!internal && nullptr == typeid_cast(&*ast)) { - process_list_entry = context.getProcessList().insert( - query, - ast.get(), - context.getClientInfo(), - settings); - - context.setProcessListElement(&process_list_entry->get()); - } - if (context.isMPPTask()) - { - if (!context.isTest()) - { - /// for MPPTask, process list entry is created in MPPTask::prepare() - RUNTIME_ASSERT(context.getDAGContext()->getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr"); - process_list_entry = context.getDAGContext()->getProcessListEntry(); - } - else - { - /// it is possible that in test mode, the process list entry is nullptr because some tests run mpp query - /// just based on dag request, there is even no MPPTask at all. - if (context.getDAGContext()->getProcessListEntry() == nullptr) - { - process_list_entry = context.getProcessList().insert( - query, - ast.get(), - context.getClientInfo(), - settings); - - context.setProcessListElement(&process_list_entry->get()); - } - else - process_list_entry = context.getDAGContext()->getProcessListEntry(); - } + process_list_entry = setProcessListElement(context, query, ast.get()); } FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); @@ -272,23 +221,7 @@ std::tuple executeQueryImpl( if (res.in) { - if (auto * stream = dynamic_cast(res.in.get())) - { - stream->setProgressCallback(context.getProgressCallback()); - stream->setProcessListElement(context.getProcessListElement()); - - /// Limits on the result, the quota on the result, and also callback for progress. - /// Limits apply only to the final result. - if (stage == QueryProcessingStage::Complete) - { - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT; - limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); - - stream->setLimits(limits); - stream->setQuota(quota); - } - } + prepareForInputStream(context, stage, res.in); } if (res.out) @@ -420,13 +353,7 @@ std::tuple executeQueryImpl( if (!internal && res.in) { - auto pipeline_log_str = [&res]() { - FmtBuffer log_buffer; - log_buffer.append("Query pipeline:\n"); - res.in->dumpTree(log_buffer); - return log_buffer.toString(); - }; - LOG_DEBUG(execute_query_logger, pipeline_log_str()); + logQueryPipeline(execute_query_logger, res.in); } } } @@ -442,6 +369,75 @@ std::tuple executeQueryImpl( } } // namespace +/// Log query into text log (not into system table). +void logQuery(const String & query, const Context & context, const LoggerPtr & logger) +{ + const auto & current_query_id = context.getClientInfo().current_query_id; + const auto & initial_query_id = context.getClientInfo().initial_query_id; + const auto & current_user = context.getClientInfo().current_user; + + LOG_FMT_DEBUG( + logger, + "(from {}{}, query_id: {}{}) {}", + context.getClientInfo().current_address.toString(), + (current_user != "default" ? ", user: " + current_user : ""), + current_query_id, + (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""), + joinLines(query)); +} + +void prepareForInputStream( + Context & context, + QueryProcessingStage::Enum stage, + const BlockInputStreamPtr & in) +{ + assert(in); + if (auto * stream = dynamic_cast(in.get())) + { + stream->setProgressCallback(context.getProgressCallback()); + stream->setProcessListElement(context.getProcessListElement()); + + /// Limits on the result, the quota on the result, and also callback for progress. + /// Limits apply only to the final result. + if (stage == QueryProcessingStage::Complete) + { + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT; + const auto & settings = context.getSettingsRef(); + limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); + + stream->setLimits(limits); + stream->setQuota(context.getQuota()); + } + } +} + +std::shared_ptr setProcessListElement( + Context & context, + const String & query, + const IAST * ast) +{ + assert(ast); + auto process_list_entry = context.getProcessList().insert( + query, + ast, + context.getClientInfo(), + context.getSettingsRef()); + context.setProcessListElement(&process_list_entry->get()); + return process_list_entry; +} + +void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in) +{ + assert(in); + auto pipeline_log_str = [&in]() { + FmtBuffer log_buffer; + log_buffer.append("Query pipeline:\n"); + in->dumpTree(log_buffer); + return log_buffer.toString(); + }; + LOG_DEBUG(logger, pipeline_log_str()); +} BlockIO executeQuery( const String & query, @@ -456,15 +452,6 @@ BlockIO executeQuery( } -BlockIO executeQuery(IQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage) -{ - BlockIO streams; - std::tie(std::ignore, streams) = executeQueryImpl(dag, context, internal, stage); - context.getDAGContext()->attachBlockIO(streams); - return streams; -} - - void executeQuery( ReadBuffer & istr, WriteBuffer & ostr, diff --git a/dbms/src/Interpreters/executeQuery.h b/dbms/src/Interpreters/executeQuery.h index 22950368db8..b326f9602ba 100644 --- a/dbms/src/Interpreters/executeQuery.h +++ b/dbms/src/Interpreters/executeQuery.h @@ -52,7 +52,18 @@ BlockIO executeQuery( QueryProcessingStage::Enum stage = QueryProcessingStage::Complete /// To which stage the query must be executed. ); +std::shared_ptr setProcessListElement( + Context & context, + const String & query, + const IAST * ast); -BlockIO executeQuery(IQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage); +void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in); + +void logQuery(const String & query, const Context & context, const LoggerPtr & logger); + +void prepareForInputStream( + Context & context, + QueryProcessingStage::Enum stage, + const BlockInputStreamPtr & in); } // namespace DB diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index a81f8a73276..1e1a62b5d5d 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -98,7 +98,7 @@ void ExecutorTest::executeInterpreter(const String & expected_string, const std: context.context.setDAGContext(&dag_context); context.context.setExecutorTest(); // Currently, don't care about regions information in interpreter tests. - auto res = executeQuery(context.context); + auto res = executeQuery(context.context, /*internal=*/true); FmtBuffer fb; res.in->dumpTree(fb); ASSERT_EQ(Poco::trim(expected_string), Poco::trim(fb.toString())); @@ -227,7 +227,7 @@ DB::ColumnsWithTypeAndName ExecutorTest::executeStreams(const std::shared_ptr & actual) From dd39f74660002984e9260e479392260bf2cc1b8e Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 27 Sep 2022 17:32:07 +0800 Subject: [PATCH 2/4] u --- dbms/src/Flash/Mpp/MPPTask.cpp | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 3750e7c19aa..b42f30934fb 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include #include #include @@ -281,14 +281,7 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request) dag_context->tidb_host = context->getClientInfo().current_address.toString(); context->setDAGContext(dag_context.get()); - - process_list_entry = context->getProcessList().insert( - dag_context->dummy_query_string, - dag_context->dummy_ast.get(), - context->getClientInfo(), - context->getSettingsRef()); - - context->setProcessListElement(&process_list_entry->get()); + process_list_entry = setProcessListElement(*context, dag_context->dummy_query_string, dag_context->dummy_ast.get()); dag_context->setProcessListEntry(process_list_entry); if (dag_context->isRootMPPTask()) From 1d10d4279b026d607200a9ea32a868ac630fb6e2 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 27 Sep 2022 17:48:24 +0800 Subject: [PATCH 3/4] fix --- dbms/src/Flash/Mpp/MPPTask.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index b42f30934fb..478427a4fc6 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include From 979b66be8b21a920ea929c5f3cb9c7f626f25993 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 27 Sep 2022 18:39:21 +0800 Subject: [PATCH 4/4] address comment --- dbms/src/Flash/executeQuery.cpp | 51 +++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp index 2596b2ebc31..46ed6688506 100644 --- a/dbms/src/Flash/executeQuery.cpp +++ b/dbms/src/Flash/executeQuery.cpp @@ -46,6 +46,24 @@ void prepareForExecute(Context & context) quota.checkExceeded(time(nullptr)); } +ProcessList::EntryPtr getProcessListEntry(Context & context, const DAGContext & dag_context) +{ + if (dag_context.is_mpp_task) + { + /// for MPPTask, process list entry is created in MPPTask::prepare() + RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr"); + return dag_context.getProcessListEntry(); + } + else + { + RUNTIME_ASSERT(dag_context.getProcessListEntry() == nullptr, "process list entry for non-MPP must be nullptr"); + return setProcessListElement( + context, + dag_context.dummy_query_string, + dag_context.dummy_ast.get()); + } +} + BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal) { RUNTIME_ASSERT(context.getDAGContext()); @@ -55,35 +73,24 @@ BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal) prepareForExecute(context); - if (!internal) + ProcessList::EntryPtr process_list_entry; + if (likely(!internal)) + { + process_list_entry = getProcessListEntry(context, dag_context); logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger); + } + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete); BlockIO res = interpreter->execute(); + if (likely(process_list_entry)) + (*process_list_entry)->setQueryStreams(res); - // For internal == true, no need to set up the resource usage tracker. - if (!internal) - { - /// Hold element of process list till end of query execution. - if (dag_context.is_mpp_task) - { - /// for MPPTask, process list entry is created in MPPTask::prepare() - RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr"); - res.process_list_entry = dag_context.getProcessListEntry(); - } - else - { - RUNTIME_ASSERT(dag_context.getProcessListEntry() == nullptr, "process list entry for non-MPP must be nullptr"); - res.process_list_entry = setProcessListElement( - context, - dag_context.dummy_query_string, - dag_context.dummy_ast.get()); - } - (*res.process_list_entry)->setQueryStreams(res); - } + /// Hold element of process list till end of query execution. + res.process_list_entry = process_list_entry; prepareForInputStream(context, QueryProcessingStage::Complete, res.in); - if (!internal) + if (likely(!internal)) logQueryPipeline(logger, res.in); dag_context.attachBlockIO(res);