diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index d1413e32009..602531fa49e 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -161,4 +161,9 @@ BaseRuntimeStatistics PipelineExecutor::getRuntimeStatistics() const } return runtime_statistics; } + +String PipelineExecutor::getExtraJsonInfo() const +{ + return exec_context.getQueryProfileInfo().toJson(); +} } // namespace DB diff --git a/dbms/src/Flash/Executor/PipelineExecutor.h b/dbms/src/Flash/Executor/PipelineExecutor.h index e8eb69b18d4..b86b3b09b9a 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.h +++ b/dbms/src/Flash/Executor/PipelineExecutor.h @@ -73,6 +73,8 @@ class PipelineExecutor : public QueryExecutor BaseRuntimeStatistics getRuntimeStatistics() const override; + String getExtraJsonInfo() const override; + protected: ExecutionResult execute(ResultHandler && result_handler) override; diff --git a/dbms/src/Flash/Executor/QueryExecutor.h b/dbms/src/Flash/Executor/QueryExecutor.h index 558f3b2504a..6b5fcfda349 100644 --- a/dbms/src/Flash/Executor/QueryExecutor.h +++ b/dbms/src/Flash/Executor/QueryExecutor.h @@ -55,6 +55,8 @@ class QueryExecutor virtual BaseRuntimeStatistics getRuntimeStatistics() const = 0; + virtual String getExtraJsonInfo() const { return "{}"; } + protected: virtual ExecutionResult execute(ResultHandler &&) = 0; diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 80932223202..75c5c221ed4 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -579,6 +579,7 @@ void MPPTask::runImpl() GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(cpu_ru + read_ru); mpp_task_statistics.setRUInfo( RUConsumption{.cpu_ru = cpu_ru, .cpu_time_ns = cpu_time_ns, .read_ru = read_ru, .read_bytes = read_bytes}); + mpp_task_statistics.setExtraInfo(query_executor_holder->getExtraJsonInfo()); mpp_task_statistics.collectRuntimeStatistics(); diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp index 09883c63b3f..0be18a3abff 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp @@ -114,7 +114,7 @@ void MPPTaskStatistics::logTracingJson() R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})" R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})" R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})" - R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{}}})", + R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{},"extra_info":{}}})", id.gather_id.query_id.start_ts, id.task_id, is_root, @@ -135,7 +135,8 @@ void MPPTaskStatistics::logTracingJson() error_message, ru_info.cpu_ru, ru_info.read_ru, - memory_peak); + memory_peak, + extra_info); } void MPPTaskStatistics::setMemoryPeak(Int64 memory_peak_) @@ -155,6 +156,11 @@ void MPPTaskStatistics::setCompileTimestamp(const Timestamp & start_timestamp, c compile_end_timestamp = end_timestamp; } +void MPPTaskStatistics::setExtraInfo(const String & extra_info_) +{ + extra_info = extra_info_; +} + void MPPTaskStatistics::recordInputBytes(DAGContext & dag_context) { switch (dag_context.getExecutionMode()) diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.h b/dbms/src/Flash/Mpp/MPPTaskStatistics.h index 334ead31eb8..ef012210407 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.h +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.h @@ -55,6 +55,8 @@ class MPPTaskStatistics void setCompileTimestamp(const Timestamp & start_timestamp, const Timestamp & end_timestamp); + void setExtraInfo(const String & extra_info_); + tipb::SelectResponse genExecutionSummaryResponse(); tipb::TiFlashExecutionInfo genTiFlashExecutionInfo(); @@ -92,5 +94,8 @@ class MPPTaskStatistics // resource RUConsumption ru_info{.cpu_ru = 0.0, .cpu_time_ns = 0, .read_ru = 0.0, .read_bytes = 0}; Int64 memory_peak = 0; + + // extra + String extra_info = "{}"; }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp index 53b1fda15fb..5ef36b27766 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp @@ -24,44 +24,52 @@ extern const char random_pipeline_model_execute_prefix_failpoint[]; extern const char random_pipeline_model_execute_suffix_failpoint[]; } // namespace FailPoints -#define HANDLE_OP_STATUS(op, op_status, expect_status) \ - switch (op_status) \ - { \ - /* For the expected status, it will not return here, */ \ - /* but instead return control to the macro caller, */ \ - /* who will continue to call the next operator. */ \ - case (expect_status): \ - break; \ - /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ - case OperatorStatus::IO_IN: \ - case OperatorStatus::IO_OUT: \ - fillIOOp((op).get()); \ - return (op_status); \ - /* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \ - case OperatorStatus::WAITING: \ - fillAwaitable((op).get()); \ - return (op_status); \ - /* For other status, an immediate return is required. */ \ - default: \ - return (op_status); \ +#define HANDLE_OP_STATUS(op, op_status, expect_status) \ + switch (op_status) \ + { \ + /* For the expected status, it will not return here, */ \ + /* but instead return control to the macro caller, */ \ + /* who will continue to call the next operator. */ \ + case (expect_status): \ + break; \ + /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ + case OperatorStatus::IO_IN: \ + case OperatorStatus::IO_OUT: \ + fillIOOp((op).get()); \ + return (op_status); \ + /* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \ + case OperatorStatus::WAITING: \ + fillAwaitable((op).get()); \ + return (op_status); \ + /* For the wait for notify status, the operator needs to be filled in awaitable for later use in await. */ \ + case OperatorStatus::WAIT_FOR_NOTIFY: \ + fillWaitingForNotifyOp((op).get()); \ + return (op_status); \ + /* For other status, an immediate return is required. */ \ + default: \ + return (op_status); \ } -#define HANDLE_LAST_OP_STATUS(op, op_status) \ - assert(op); \ - switch (op_status) \ - { \ - /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ - case OperatorStatus::IO_IN: \ - case OperatorStatus::IO_OUT: \ - fillIOOp((op).get()); \ - return (op_status); \ - /* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \ - case OperatorStatus::WAITING: \ - fillAwaitable((op).get()); \ - return (op_status); \ - /* For the last operator, the status will always be returned. */ \ - default: \ - return (op_status); \ +#define HANDLE_LAST_OP_STATUS(op, op_status) \ + assert(op); \ + switch (op_status) \ + { \ + /* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \ + case OperatorStatus::IO_IN: \ + case OperatorStatus::IO_OUT: \ + fillIOOp((op).get()); \ + return (op_status); \ + /* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \ + case OperatorStatus::WAITING: \ + fillAwaitable((op).get()); \ + return (op_status); \ + /* For the wait for notify status, the operator needs to be filled in awaitable for later use in await. */ \ + case OperatorStatus::WAIT_FOR_NOTIFY: \ + fillWaitingForNotifyOp((op).get()); \ + return (op_status); \ + /* For the last operator, the status will always be returned. */ \ + default: \ + return (op_status); \ } PipelineExec::PipelineExec(SourceOpPtr && source_op_, TransformOps && transform_ops_, SinkOpPtr && sink_op_) @@ -89,6 +97,13 @@ void PipelineExec::executeSuffix() source_op->operateSuffix(); } +void PipelineExec::notify() +{ + assert(waiting_for_notify); + waiting_for_notify->notify(); + waiting_for_notify = nullptr; +} + OperatorStatus PipelineExec::execute() { auto op_status = executeImpl(); @@ -107,6 +122,10 @@ OperatorStatus PipelineExec::execute() */ OperatorStatus PipelineExec::executeImpl() { + assert(!awaitable); + assert(!io_op); + assert(!waiting_for_notify); + Block block; size_t start_transform_op_index = 0; auto op_status = fetchBlock(block, start_transform_op_index); @@ -156,12 +175,25 @@ OperatorStatus PipelineExec::executeIO() } OperatorStatus PipelineExec::executeIOImpl() { + assert(!waiting_for_notify); + assert(!awaitable); assert(io_op); auto op_status = io_op->executeIO(); - if (op_status == OperatorStatus::WAITING) + switch (op_status) + { + case OperatorStatus::IO_IN: + case OperatorStatus::IO_OUT: + return op_status; + case OperatorStatus::WAITING: fillAwaitable(io_op); - if (op_status != OperatorStatus::IO_IN && op_status != OperatorStatus::IO_OUT) - io_op = nullptr; + break; + case OperatorStatus::WAIT_FOR_NOTIFY: + fillWaitingForNotifyOp(io_op); + break; + default: + break; + } + io_op = nullptr; return op_status; } @@ -177,40 +209,59 @@ OperatorStatus PipelineExec::await() } OperatorStatus PipelineExec::awaitImpl() { + assert(!waiting_for_notify); + assert(!io_op); assert(awaitable); auto op_status = awaitable->await(); - if (op_status == OperatorStatus::IO_IN || op_status == OperatorStatus::IO_OUT) + switch (op_status) + { + case OperatorStatus::WAITING: + return op_status; + case OperatorStatus::IO_IN: + case OperatorStatus::IO_OUT: fillIOOp(awaitable); - if (op_status != OperatorStatus::WAITING) - awaitable = nullptr; + break; + case OperatorStatus::WAIT_FOR_NOTIFY: + fillWaitingForNotifyOp(awaitable); + break; + default: + break; + } + awaitable = nullptr; return op_status; } #undef HANDLE_OP_STATUS #undef HANDLE_LAST_OP_STATUS -void PipelineExec::finalizeProfileInfo(UInt64 extra_time) +void PipelineExec::finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time) { - // `extra_time` usually includes pipeline schedule duration and task queuing time. - // - // The pipeline schedule duration should be added to the pipeline breaker operator(AggConvergent and JoinProbe), + // For the pipeline_breaker_wait_time, it should be added to the pipeline breaker operator(AggConvergent and JoinProbe), // However, if there are multiple pipeline breaker operators within a single pipeline, it can become very complex. // Therefore, to simplify matters, we will include the pipeline schedule duration in the execution time of the source operator. // - // ditto for task queuing time. + // For the queuing_time, it should be evenly distributed across all operators. // // TODO Refining execution summary, excluding extra time from execution time. - // For example: [total_time:6s, execution_time:1s, pending_time:2s, pipeline_waiting_time:3s] + // For example: [total_time:6s, execution_time:1s, queuing_time:2s, pipeline_breaker_wait_time:3s] + + // The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + (i + 1) * extra_time / operator_num. + + source_op->getProfileInfo()->execution_time += pipeline_breaker_wait_time; + + UInt64 operator_num = 2 + transform_ops.size(); + UInt64 per_operator_queuing_time = queuing_time / operator_num; - // The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + extra_time. - source_op->getProfileInfo()->execution_time += extra_time; - extra_time = source_op->getProfileInfo()->execution_time; + source_op->getProfileInfo()->execution_time += per_operator_queuing_time; + // Compensate for the values missing due to rounding. + source_op->getProfileInfo()->execution_time += (queuing_time - (per_operator_queuing_time * operator_num)); + UInt64 time_for_prev_op = source_op->getProfileInfo()->execution_time; for (const auto & transform_op : transform_ops) { - transform_op->getProfileInfo()->execution_time += extra_time; - extra_time = transform_op->getProfileInfo()->execution_time; + transform_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op); + time_for_prev_op = transform_op->getProfileInfo()->execution_time; } - sink_op->getProfileInfo()->execution_time += extra_time; + sink_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op); } } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h index 275c019cc97..103e0ad3e80 100644 --- a/dbms/src/Flash/Pipeline/Exec/PipelineExec.h +++ b/dbms/src/Flash/Pipeline/Exec/PipelineExec.h @@ -37,7 +37,9 @@ class PipelineExec : private boost::noncopyable OperatorStatus await(); - void finalizeProfileInfo(UInt64 extra_time); + void notify(); + + void finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time); private: inline OperatorStatus executeImpl(); @@ -62,6 +64,13 @@ class PipelineExec : private boost::noncopyable io_op = op; } + ALWAYS_INLINE void fillWaitingForNotifyOp(Operator * op) + { + assert(!waiting_for_notify); + assert(op); + waiting_for_notify = op; + } + private: SourceOpPtr source_op; TransformOps transform_ops; @@ -72,6 +81,9 @@ class PipelineExec : private boost::noncopyable // hold the operator which is ready for executing io. Operator * io_op = nullptr; + + // hold the operator which is waiting for notify. + Operator * waiting_for_notify = nullptr; }; using PipelineExecPtr = std::unique_ptr; // a set of pipeline_execs running in parallel. diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h index 963433248d1..2f0171b8ba2 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h @@ -76,9 +76,16 @@ class PipeConditionVariable { assert(task); task->notify(); - task->profile_info.elapsedWaitForNotifyTime(); assert(TaskScheduler::instance); - TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task)); + if (unlikely(task->getStatus() == ExecTaskStatus::WAITING)) + { + TaskScheduler::instance->submitToWaitReactor(std::move(task)); + } + else + { + assert(task->getStatus() == ExecTaskStatus::RUNNING); + TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task)); + } } private: diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h index 318e56179d4..d7201697c2f 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h @@ -41,11 +41,11 @@ class PipelineTask ExecTaskStatus awaitImpl() override { return runAwait(); } + void notifyImpl() override { runNotify(); } + void doFinalizeImpl() override { - runFinalize( - profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs() - + profile_info.getWaitForNotifyTimeNs() + getScheduleDuration()); + runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs(), getScheduleDuration()); } }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h index 34274e33880..991813dd52d 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h @@ -124,15 +124,21 @@ class PipelineTaskBase } } - void runFinalize(UInt64 extra_time) + void runFinalize(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time) { assert(pipeline_exec); pipeline_exec->executeSuffix(); - pipeline_exec->finalizeProfileInfo(extra_time); + pipeline_exec->finalizeProfileInfo(queuing_time, pipeline_breaker_wait_time); pipeline_exec = nullptr; pipeline_exec_holder.reset(); } + void runNotify() + { + assert(pipeline_exec); + pipeline_exec->notify(); + } + private: PipelineExecPtr pipeline_exec_holder; // To reduce the overheads of `pipeline_exec_holder.get()` diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/SimplePipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/SimplePipelineTask.h index c713bfe95f5..03f30962ca0 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/SimplePipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/SimplePipelineTask.h @@ -40,9 +40,11 @@ class SimplePipelineTask ExecTaskStatus awaitImpl() override { return runAwait(); } + void notifyImpl() override { runNotify(); } + void finalizeImpl() override { - runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs()); + runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs(), 0); } }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp index 2a060173425..c4e09c0e864 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp @@ -151,7 +151,14 @@ ExecTaskStatus Task::await() void Task::notify() { assert(task_status == ExecTaskStatus::WAIT_FOR_NOTIFY); - switchStatus(ExecTaskStatus::RUNNING); + // If the query has been canceled, + // move the task to WaitReactor to quickly trigger the cancel process. + if (unlikely(exec_context.isCancelled())) + switchStatus(ExecTaskStatus::WAITING); + else + switchStatus(ExecTaskStatus::RUNNING); + notifyImpl(); + profile_info.elapsedWaitForNotifyTime(); } void Task::finalize() diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index 1681eb45e1b..e47ce10f38c 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -103,6 +103,8 @@ class Task // Avoid allocating memory in `await` if possible. virtual ExecTaskStatus awaitImpl() { return ExecTaskStatus::RUNNING; } + virtual void notifyImpl() {} + // Used to release held resources, just like `Event::finishImpl`. virtual void finalizeImpl() {} diff --git a/dbms/src/Operators/ConcatSourceOp.h b/dbms/src/Operators/ConcatSourceOp.h index 29f47be70e8..38e35257065 100644 --- a/dbms/src/Operators/ConcatSourceOp.h +++ b/dbms/src/Operators/ConcatSourceOp.h @@ -147,6 +147,12 @@ class ConcatSourceOp : public SourceOp return status; } + void notifyImpl() override + { + assert(cur_exec); + cur_exec->notify(); + } + private: bool popExec() { diff --git a/dbms/src/Operators/Operator.cpp b/dbms/src/Operators/Operator.cpp index 109a339be41..827d349487a 100644 --- a/dbms/src/Operators/Operator.cpp +++ b/dbms/src/Operators/Operator.cpp @@ -88,6 +88,12 @@ OperatorStatus Operator::executeIO() return op_status; } +void Operator::notify() +{ + profile_info.update(); + notifyImpl(); +} + OperatorStatus SourceOp::read(Block & block) { CHECK_IS_CANCELLED diff --git a/dbms/src/Operators/Operator.h b/dbms/src/Operators/Operator.h index d6deec19a02..889fcfa05cf 100644 --- a/dbms/src/Operators/Operator.h +++ b/dbms/src/Operators/Operator.h @@ -66,6 +66,8 @@ class Operator // running status may return are NEED_INPUT and HAS_OUTPUT here. OperatorStatus await(); + void notify(); + // These two methods are used to set state, log and etc, and should not perform calculation logic. void operatePrefix(); void operateSuffix(); @@ -98,6 +100,8 @@ class Operator virtual OperatorStatus awaitImpl() { throw Exception("Unsupport"); } + virtual void notifyImpl() {} + protected: PipelineExecutorContext & exec_context; const LoggerPtr log;