Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: refactor the extra time calculation in explain analyze (#8987) #9340

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,9 @@ BaseRuntimeStatistics PipelineExecutor::getRuntimeStatistics() const
}
return runtime_statistics;
}

String PipelineExecutor::getExtraJsonInfo() const
{
return exec_context.getQueryProfileInfo().toJson();
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class PipelineExecutor : public QueryExecutor

BaseRuntimeStatistics getRuntimeStatistics() const override;

String getExtraJsonInfo() const override;

protected:
ExecutionResult execute(ResultHandler && result_handler) override;

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Executor/QueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class QueryExecutor

virtual BaseRuntimeStatistics getRuntimeStatistics() const = 0;

virtual String getExtraJsonInfo() const { return "{}"; }

protected:
virtual ExecutionResult execute(ResultHandler &&) = 0;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ void MPPTask::runImpl()
GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(cpu_ru + read_ru);
mpp_task_statistics.setRUInfo(
RUConsumption{.cpu_ru = cpu_ru, .cpu_time_ns = cpu_time_ns, .read_ru = read_ru, .read_bytes = read_bytes});
mpp_task_statistics.setExtraInfo(query_executor_holder->getExtraJsonInfo());

mpp_task_statistics.collectRuntimeStatistics();

Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void MPPTaskStatistics::logTracingJson()
R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})"
R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})"
R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})"
R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{}}})",
R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{},"extra_info":{}}})",
id.gather_id.query_id.start_ts,
id.task_id,
is_root,
Expand All @@ -135,7 +135,8 @@ void MPPTaskStatistics::logTracingJson()
error_message,
ru_info.cpu_ru,
ru_info.read_ru,
memory_peak);
memory_peak,
extra_info);
}

void MPPTaskStatistics::setMemoryPeak(Int64 memory_peak_)
Expand All @@ -155,6 +156,11 @@ void MPPTaskStatistics::setCompileTimestamp(const Timestamp & start_timestamp, c
compile_end_timestamp = end_timestamp;
}

void MPPTaskStatistics::setExtraInfo(const String & extra_info_)
{
extra_info = extra_info_;
}

void MPPTaskStatistics::recordInputBytes(DAGContext & dag_context)
{
switch (dag_context.getExecutionMode())
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class MPPTaskStatistics

void setCompileTimestamp(const Timestamp & start_timestamp, const Timestamp & end_timestamp);

void setExtraInfo(const String & extra_info_);

tipb::SelectResponse genExecutionSummaryResponse();

tipb::TiFlashExecutionInfo genTiFlashExecutionInfo();
Expand Down Expand Up @@ -92,5 +94,8 @@ class MPPTaskStatistics
// resource
RUConsumption ru_info{.cpu_ru = 0.0, .cpu_time_ns = 0, .read_ru = 0.0, .read_bytes = 0};
Int64 memory_peak = 0;

// extra
String extra_info = "{}";
};
} // namespace DB
159 changes: 105 additions & 54 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,52 @@ extern const char random_pipeline_model_execute_prefix_failpoint[];
extern const char random_pipeline_model_execute_suffix_failpoint[];
} // namespace FailPoints

#define HANDLE_OP_STATUS(op, op_status, expect_status) \
switch (op_status) \
{ \
/* For the expected status, it will not return here, */ \
/* but instead return control to the macro caller, */ \
/* who will continue to call the next operator. */ \
case (expect_status): \
break; \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For other status, an immediate return is required. */ \
default: \
return (op_status); \
#define HANDLE_OP_STATUS(op, op_status, expect_status) \
switch (op_status) \
{ \
/* For the expected status, it will not return here, */ \
/* but instead return control to the macro caller, */ \
/* who will continue to call the next operator. */ \
case (expect_status): \
break; \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For the wait for notify status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAIT_FOR_NOTIFY: \
fillWaitingForNotifyOp((op).get()); \
return (op_status); \
/* For other status, an immediate return is required. */ \
default: \
return (op_status); \
}

#define HANDLE_LAST_OP_STATUS(op, op_status) \
assert(op); \
switch (op_status) \
{ \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For the last operator, the status will always be returned. */ \
default: \
return (op_status); \
#define HANDLE_LAST_OP_STATUS(op, op_status) \
assert(op); \
switch (op_status) \
{ \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For the wait for notify status, the operator needs to be filled in awaitable for later use in await. */ \
case OperatorStatus::WAIT_FOR_NOTIFY: \
fillWaitingForNotifyOp((op).get()); \
return (op_status); \
/* For the last operator, the status will always be returned. */ \
default: \
return (op_status); \
}

PipelineExec::PipelineExec(SourceOpPtr && source_op_, TransformOps && transform_ops_, SinkOpPtr && sink_op_)
Expand Down Expand Up @@ -89,6 +97,13 @@ void PipelineExec::executeSuffix()
source_op->operateSuffix();
}

void PipelineExec::notify()
{
assert(waiting_for_notify);
waiting_for_notify->notify();
waiting_for_notify = nullptr;
}

OperatorStatus PipelineExec::execute()
{
auto op_status = executeImpl();
Expand All @@ -107,6 +122,10 @@ OperatorStatus PipelineExec::execute()
*/
OperatorStatus PipelineExec::executeImpl()
{
assert(!awaitable);
assert(!io_op);
assert(!waiting_for_notify);

Block block;
size_t start_transform_op_index = 0;
auto op_status = fetchBlock(block, start_transform_op_index);
Expand Down Expand Up @@ -156,12 +175,25 @@ OperatorStatus PipelineExec::executeIO()
}
OperatorStatus PipelineExec::executeIOImpl()
{
assert(!waiting_for_notify);
assert(!awaitable);
assert(io_op);
auto op_status = io_op->executeIO();
if (op_status == OperatorStatus::WAITING)
switch (op_status)
{
case OperatorStatus::IO_IN:
case OperatorStatus::IO_OUT:
return op_status;
case OperatorStatus::WAITING:
fillAwaitable(io_op);
if (op_status != OperatorStatus::IO_IN && op_status != OperatorStatus::IO_OUT)
io_op = nullptr;
break;
case OperatorStatus::WAIT_FOR_NOTIFY:
fillWaitingForNotifyOp(io_op);
break;
default:
break;
}
io_op = nullptr;
return op_status;
}

Expand All @@ -177,40 +209,59 @@ OperatorStatus PipelineExec::await()
}
OperatorStatus PipelineExec::awaitImpl()
{
assert(!waiting_for_notify);
assert(!io_op);
assert(awaitable);
auto op_status = awaitable->await();
if (op_status == OperatorStatus::IO_IN || op_status == OperatorStatus::IO_OUT)
switch (op_status)
{
case OperatorStatus::WAITING:
return op_status;
case OperatorStatus::IO_IN:
case OperatorStatus::IO_OUT:
fillIOOp(awaitable);
if (op_status != OperatorStatus::WAITING)
awaitable = nullptr;
break;
case OperatorStatus::WAIT_FOR_NOTIFY:
fillWaitingForNotifyOp(awaitable);
break;
default:
break;
}
awaitable = nullptr;
return op_status;
}

#undef HANDLE_OP_STATUS
#undef HANDLE_LAST_OP_STATUS

void PipelineExec::finalizeProfileInfo(UInt64 extra_time)
void PipelineExec::finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time)
{
// `extra_time` usually includes pipeline schedule duration and task queuing time.
//
// The pipeline schedule duration should be added to the pipeline breaker operator(AggConvergent and JoinProbe),
// For the pipeline_breaker_wait_time, it should be added to the pipeline breaker operator(AggConvergent and JoinProbe),
// However, if there are multiple pipeline breaker operators within a single pipeline, it can become very complex.
// Therefore, to simplify matters, we will include the pipeline schedule duration in the execution time of the source operator.
//
// ditto for task queuing time.
// For the queuing_time, it should be evenly distributed across all operators.
//
// TODO Refining execution summary, excluding extra time from execution time.
// For example: [total_time:6s, execution_time:1s, pending_time:2s, pipeline_waiting_time:3s]
// For example: [total_time:6s, execution_time:1s, queuing_time:2s, pipeline_breaker_wait_time:3s]

// The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + (i + 1) * extra_time / operator_num.

source_op->getProfileInfo()->execution_time += pipeline_breaker_wait_time;

UInt64 operator_num = 2 + transform_ops.size();
UInt64 per_operator_queuing_time = queuing_time / operator_num;

// The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + extra_time.
source_op->getProfileInfo()->execution_time += extra_time;
extra_time = source_op->getProfileInfo()->execution_time;
source_op->getProfileInfo()->execution_time += per_operator_queuing_time;
// Compensate for the values missing due to rounding.
source_op->getProfileInfo()->execution_time += (queuing_time - (per_operator_queuing_time * operator_num));
UInt64 time_for_prev_op = source_op->getProfileInfo()->execution_time;
for (const auto & transform_op : transform_ops)
{
transform_op->getProfileInfo()->execution_time += extra_time;
extra_time = transform_op->getProfileInfo()->execution_time;
transform_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op);
time_for_prev_op = transform_op->getProfileInfo()->execution_time;
}
sink_op->getProfileInfo()->execution_time += extra_time;
sink_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op);
}

} // namespace DB
14 changes: 13 additions & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class PipelineExec : private boost::noncopyable

OperatorStatus await();

void finalizeProfileInfo(UInt64 extra_time);
void notify();

void finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time);

private:
inline OperatorStatus executeImpl();
Expand All @@ -62,6 +64,13 @@ class PipelineExec : private boost::noncopyable
io_op = op;
}

ALWAYS_INLINE void fillWaitingForNotifyOp(Operator * op)
{
assert(!waiting_for_notify);
assert(op);
waiting_for_notify = op;
}

private:
SourceOpPtr source_op;
TransformOps transform_ops;
Expand All @@ -72,6 +81,9 @@ class PipelineExec : private boost::noncopyable

// hold the operator which is ready for executing io.
Operator * io_op = nullptr;

// hold the operator which is waiting for notify.
Operator * waiting_for_notify = nullptr;
};
using PipelineExecPtr = std::unique_ptr<PipelineExec>;
// a set of pipeline_execs running in parallel.
Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,16 @@ class PipeConditionVariable
{
assert(task);
task->notify();
task->profile_info.elapsedWaitForNotifyTime();
assert(TaskScheduler::instance);
TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task));
if (unlikely(task->getStatus() == ExecTaskStatus::WAITING))
{
TaskScheduler::instance->submitToWaitReactor(std::move(task));
}
else
{
assert(task->getStatus() == ExecTaskStatus::RUNNING);
TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task));
}
}

private:
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ class PipelineTask

ExecTaskStatus awaitImpl() override { return runAwait(); }

void notifyImpl() override { runNotify(); }

void doFinalizeImpl() override
{
runFinalize(
profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs()
+ profile_info.getWaitForNotifyTimeNs() + getScheduleDuration());
runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs(), getScheduleDuration());
}
};
} // namespace DB
10 changes: 8 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,21 @@ class PipelineTaskBase
}
}

void runFinalize(UInt64 extra_time)
void runFinalize(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time)
{
assert(pipeline_exec);
pipeline_exec->executeSuffix();
pipeline_exec->finalizeProfileInfo(extra_time);
pipeline_exec->finalizeProfileInfo(queuing_time, pipeline_breaker_wait_time);
pipeline_exec = nullptr;
pipeline_exec_holder.reset();
}

void runNotify()
{
assert(pipeline_exec);
pipeline_exec->notify();
}

private:
PipelineExecPtr pipeline_exec_holder;
// To reduce the overheads of `pipeline_exec_holder.get()`
Expand Down
Loading