Skip to content

Commit

Permalink
Pipeline: refactor the extra time calculation in explain analyze (#8987
Browse files Browse the repository at this point in the history
…) (#9340)

close #8962

Co-authored-by: SeaRise <[email protected]>
Co-authored-by: JaySon <[email protected]>
  • Loading branch information
3 people authored Sep 4, 2024
1 parent eb585f7 commit 0d9d397
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 66 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,9 @@ BaseRuntimeStatistics PipelineExecutor::getRuntimeStatistics() const
}
return runtime_statistics;
}

String PipelineExecutor::getExtraJsonInfo() const
{
return exec_context.getQueryProfileInfo().toJson();
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class PipelineExecutor : public QueryExecutor

BaseRuntimeStatistics getRuntimeStatistics() const override;

String getExtraJsonInfo() const override;

protected:
ExecutionResult execute(ResultHandler && result_handler) override;

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Executor/QueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class QueryExecutor

virtual BaseRuntimeStatistics getRuntimeStatistics() const = 0;

virtual String getExtraJsonInfo() const { return "{}"; }

protected:
virtual ExecutionResult execute(ResultHandler &&) = 0;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ void MPPTask::runImpl()
GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(cpu_ru + read_ru);
mpp_task_statistics.setRUInfo(
RUConsumption{.cpu_ru = cpu_ru, .cpu_time_ns = cpu_time_ns, .read_ru = read_ru, .read_bytes = read_bytes});
mpp_task_statistics.setExtraInfo(query_executor_holder->getExtraJsonInfo());

mpp_task_statistics.collectRuntimeStatistics();

Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void MPPTaskStatistics::logTracingJson()
R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})"
R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})"
R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})"
R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{}}})",
R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{},"extra_info":{}}})",
id.gather_id.query_id.start_ts,
id.task_id,
is_root,
Expand All @@ -135,7 +135,8 @@ void MPPTaskStatistics::logTracingJson()
error_message,
ru_info.cpu_ru,
ru_info.read_ru,
memory_peak);
memory_peak,
extra_info);
}

void MPPTaskStatistics::setMemoryPeak(Int64 memory_peak_)
Expand All @@ -155,6 +156,11 @@ void MPPTaskStatistics::setCompileTimestamp(const Timestamp & start_timestamp, c
compile_end_timestamp = end_timestamp;
}

void MPPTaskStatistics::setExtraInfo(const String & extra_info_)
{
extra_info = extra_info_;
}

void MPPTaskStatistics::recordInputBytes(DAGContext & dag_context)
{
switch (dag_context.getExecutionMode())
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class MPPTaskStatistics

void setCompileTimestamp(const Timestamp & start_timestamp, const Timestamp & end_timestamp);

void setExtraInfo(const String & extra_info_);

tipb::SelectResponse genExecutionSummaryResponse();

tipb::TiFlashExecutionInfo genTiFlashExecutionInfo();
Expand Down Expand Up @@ -92,5 +94,8 @@ class MPPTaskStatistics
// resource
RUConsumption ru_info{.cpu_ru = 0.0, .cpu_time_ns = 0, .read_ru = 0.0, .read_bytes = 0};
Int64 memory_peak = 0;

// extra
String extra_info = "{}";
};
} // namespace DB
159 changes: 105 additions & 54 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,52 @@ extern const char random_pipeline_model_execute_prefix_failpoint[];
extern const char random_pipeline_model_execute_suffix_failpoint[];
} // namespace FailPoints

#define HANDLE_OP_STATUS(op, op_status, expect_status) \
switch (op_status) \
{ \
/* For the expected status, it will not return here, */ \
/* but instead return control to the macro caller, */ \
/* who will continue to call the next operator. */ \
case (expect_status): \
break; \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For other status, an immediate return is required. */ \
default: \
return (op_status); \
#define HANDLE_OP_STATUS(op, op_status, expect_status) \
switch (op_status) \
{ \
/* For the expected status, it will not return here, */ \
/* but instead return control to the macro caller, */ \
/* who will continue to call the next operator. */ \
case (expect_status): \
break; \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For the wait for notify status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAIT_FOR_NOTIFY: \
fillWaitingForNotifyOp((op).get()); \
return (op_status); \
/* For other status, an immediate return is required. */ \
default: \
return (op_status); \
}

#define HANDLE_LAST_OP_STATUS(op, op_status) \
assert(op); \
switch (op_status) \
{ \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For the last operator, the status will always be returned. */ \
default: \
return (op_status); \
#define HANDLE_LAST_OP_STATUS(op, op_status) \
assert(op); \
switch (op_status) \
{ \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For the wait for notify status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAIT_FOR_NOTIFY: \
fillWaitingForNotifyOp((op).get()); \
return (op_status); \
/* For the last operator, the status will always be returned. */ \
default: \
return (op_status); \
}

PipelineExec::PipelineExec(SourceOpPtr && source_op_, TransformOps && transform_ops_, SinkOpPtr && sink_op_)
Expand Down Expand Up @@ -89,6 +97,13 @@ void PipelineExec::executeSuffix()
source_op->operateSuffix();
}

void PipelineExec::notify()
{
assert(waiting_for_notify);
waiting_for_notify->notify();
waiting_for_notify = nullptr;
}

OperatorStatus PipelineExec::execute()
{
auto op_status = executeImpl();
Expand All @@ -107,6 +122,10 @@ OperatorStatus PipelineExec::execute()
*/
OperatorStatus PipelineExec::executeImpl()
{
assert(!awaitable);
assert(!io_op);
assert(!waiting_for_notify);

Block block;
size_t start_transform_op_index = 0;
auto op_status = fetchBlock(block, start_transform_op_index);
Expand Down Expand Up @@ -156,12 +175,25 @@ OperatorStatus PipelineExec::executeIO()
}
OperatorStatus PipelineExec::executeIOImpl()
{
assert(!waiting_for_notify);
assert(!awaitable);
assert(io_op);
auto op_status = io_op->executeIO();
if (op_status == OperatorStatus::WAITING)
switch (op_status)
{
case OperatorStatus::IO_IN:
case OperatorStatus::IO_OUT:
return op_status;
case OperatorStatus::WAITING:
fillAwaitable(io_op);
if (op_status != OperatorStatus::IO_IN && op_status != OperatorStatus::IO_OUT)
io_op = nullptr;
break;
case OperatorStatus::WAIT_FOR_NOTIFY:
fillWaitingForNotifyOp(io_op);
break;
default:
break;
}
io_op = nullptr;
return op_status;
}

Expand All @@ -177,40 +209,59 @@ OperatorStatus PipelineExec::await()
}
OperatorStatus PipelineExec::awaitImpl()
{
assert(!waiting_for_notify);
assert(!io_op);
assert(awaitable);
auto op_status = awaitable->await();
if (op_status == OperatorStatus::IO_IN || op_status == OperatorStatus::IO_OUT)
switch (op_status)
{
case OperatorStatus::WAITING:
return op_status;
case OperatorStatus::IO_IN:
case OperatorStatus::IO_OUT:
fillIOOp(awaitable);
if (op_status != OperatorStatus::WAITING)
awaitable = nullptr;
break;
case OperatorStatus::WAIT_FOR_NOTIFY:
fillWaitingForNotifyOp(awaitable);
break;
default:
break;
}
awaitable = nullptr;
return op_status;
}

#undef HANDLE_OP_STATUS
#undef HANDLE_LAST_OP_STATUS

void PipelineExec::finalizeProfileInfo(UInt64 extra_time)
void PipelineExec::finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time)
{
// `extra_time` usually includes pipeline schedule duration and task queuing time.
//
// The pipeline schedule duration should be added to the pipeline breaker operator(AggConvergent and JoinProbe),
// For the pipeline_breaker_wait_time, it should be added to the pipeline breaker operator(AggConvergent and JoinProbe),
// However, if there are multiple pipeline breaker operators within a single pipeline, it can become very complex.
// Therefore, to simplify matters, we will include the pipeline schedule duration in the execution time of the source operator.
//
// ditto for task queuing time.
// For the queuing_time, it should be evenly distributed across all operators.
//
// TODO Refining execution summary, excluding extra time from execution time.
// For example: [total_time:6s, execution_time:1s, pending_time:2s, pipeline_waiting_time:3s]
// For example: [total_time:6s, execution_time:1s, queuing_time:2s, pipeline_breaker_wait_time:3s]

// The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + (i + 1) * extra_time / operator_num.

source_op->getProfileInfo()->execution_time += pipeline_breaker_wait_time;

UInt64 operator_num = 2 + transform_ops.size();
UInt64 per_operator_queuing_time = queuing_time / operator_num;

// The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + extra_time.
source_op->getProfileInfo()->execution_time += extra_time;
extra_time = source_op->getProfileInfo()->execution_time;
source_op->getProfileInfo()->execution_time += per_operator_queuing_time;
// Compensate for the values missing due to rounding.
source_op->getProfileInfo()->execution_time += (queuing_time - (per_operator_queuing_time * operator_num));
UInt64 time_for_prev_op = source_op->getProfileInfo()->execution_time;
for (const auto & transform_op : transform_ops)
{
transform_op->getProfileInfo()->execution_time += extra_time;
extra_time = transform_op->getProfileInfo()->execution_time;
transform_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op);
time_for_prev_op = transform_op->getProfileInfo()->execution_time;
}
sink_op->getProfileInfo()->execution_time += extra_time;
sink_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op);
}

} // namespace DB
14 changes: 13 additions & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class PipelineExec : private boost::noncopyable

OperatorStatus await();

void finalizeProfileInfo(UInt64 extra_time);
void notify();

void finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time);

private:
inline OperatorStatus executeImpl();
Expand All @@ -62,6 +64,13 @@ class PipelineExec : private boost::noncopyable
io_op = op;
}

ALWAYS_INLINE void fillWaitingForNotifyOp(Operator * op)
{
assert(!waiting_for_notify);
assert(op);
waiting_for_notify = op;
}

private:
SourceOpPtr source_op;
TransformOps transform_ops;
Expand All @@ -72,6 +81,9 @@ class PipelineExec : private boost::noncopyable

// hold the operator which is ready for executing io.
Operator * io_op = nullptr;

// hold the operator which is waiting for notify.
Operator * waiting_for_notify = nullptr;
};
using PipelineExecPtr = std::unique_ptr<PipelineExec>;
// a set of pipeline_execs running in parallel.
Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,16 @@ class PipeConditionVariable
{
assert(task);
task->notify();
task->profile_info.elapsedWaitForNotifyTime();
assert(TaskScheduler::instance);
TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task));
if (unlikely(task->getStatus() == ExecTaskStatus::WAITING))
{
TaskScheduler::instance->submitToWaitReactor(std::move(task));
}
else
{
assert(task->getStatus() == ExecTaskStatus::RUNNING);
TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task));
}
}

private:
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ class PipelineTask

ExecTaskStatus awaitImpl() override { return runAwait(); }

void notifyImpl() override { runNotify(); }

void doFinalizeImpl() override
{
runFinalize(
profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs()
+ profile_info.getWaitForNotifyTimeNs() + getScheduleDuration());
runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs(), getScheduleDuration());
}
};
} // namespace DB
10 changes: 8 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,21 @@ class PipelineTaskBase
}
}

void runFinalize(UInt64 extra_time)
void runFinalize(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time)
{
assert(pipeline_exec);
pipeline_exec->executeSuffix();
pipeline_exec->finalizeProfileInfo(extra_time);
pipeline_exec->finalizeProfileInfo(queuing_time, pipeline_breaker_wait_time);
pipeline_exec = nullptr;
pipeline_exec_holder.reset();
}

void runNotify()
{
assert(pipeline_exec);
pipeline_exec->notify();
}

private:
PipelineExecPtr pipeline_exec_holder;
// To reduce the overheads of `pipeline_exec_holder.get()`
Expand Down
Loading

0 comments on commit 0d9d397

Please sign in to comment.