Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: refactor the extra time calculation in explain analyze (#8987) #9047

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,9 @@ BaseRuntimeStatistics PipelineExecutor::getRuntimeStatistics() const
}
return runtime_statistics;
}

String PipelineExecutor::getExtraJsonInfo() const
{
return exec_context.getQueryProfileInfo().toJson();
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class PipelineExecutor : public QueryExecutor

BaseRuntimeStatistics getRuntimeStatistics() const override;

String getExtraJsonInfo() const override;

protected:
ExecutionResult execute(ResultHandler && result_handler) override;

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Executor/QueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class QueryExecutor

virtual BaseRuntimeStatistics getRuntimeStatistics() const = 0;

virtual String getExtraJsonInfo() const { return "{}"; }

protected:
virtual ExecutionResult execute(ResultHandler &&) = 0;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ void MPPTask::runImpl()
LOG_DEBUG(log, "mpp finish with request unit: cpu={} read={}", cpu_ru, read_ru);
GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(cpu_ru + read_ru);
mpp_task_statistics.setRU(cpu_ru, read_ru);

mpp_task_statistics.setExtraInfo(query_executor_holder->getExtraJsonInfo());
mpp_task_statistics.collectRuntimeStatistics();

auto runtime_statistics = query_executor_holder->getRuntimeStatistics();
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void MPPTaskStatistics::logTracingJson()
R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})"
R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})"
R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})"
R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{}}})",
R"(,"status":"{}","error_message":"{}","cpu_ru":{},"read_ru":{},"memory_peak":{},"extra_info":{}}})",
id.gather_id.query_id.start_ts,
id.task_id,
is_root,
Expand All @@ -135,7 +135,8 @@ void MPPTaskStatistics::logTracingJson()
error_message,
cpu_ru,
read_ru,
memory_peak);
memory_peak,
extra_info);
}

void MPPTaskStatistics::setMemoryPeak(Int64 memory_peak_)
Expand All @@ -155,6 +156,11 @@ void MPPTaskStatistics::setCompileTimestamp(const Timestamp & start_timestamp, c
compile_end_timestamp = end_timestamp;
}

void MPPTaskStatistics::setExtraInfo(const String & extra_info_)
{
extra_info = extra_info_;
}

void MPPTaskStatistics::recordInputBytes(DAGContext & dag_context)
{
switch (dag_context.getExecutionMode())
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class MPPTaskStatistics

void setCompileTimestamp(const Timestamp & start_timestamp, const Timestamp & end_timestamp);

void setExtraInfo(const String & extra_info_);

tipb::SelectResponse genExecutionSummaryResponse();

tipb::TiFlashExecutionInfo genTiFlashExecutionInfo();
Expand Down Expand Up @@ -93,5 +95,8 @@ class MPPTaskStatistics
RU cpu_ru = 0;
RU read_ru = 0;
Int64 memory_peak = 0;

// extra
String extra_info = "{}";
};
} // namespace DB
30 changes: 18 additions & 12 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,34 @@ OperatorStatus PipelineExec::awaitImpl()
#undef HANDLE_OP_STATUS
#undef HANDLE_LAST_OP_STATUS

void PipelineExec::finalizeProfileInfo(UInt64 extra_time)
void PipelineExec::finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time)
{
// `extra_time` usually includes pipeline schedule duration and task queuing time.
//
// The pipeline schedule duration should be added to the pipeline breaker operator(AggConvergent and JoinProbe),
// For the pipeline_breaker_wait_time, it should be added to the pipeline breaker operator(AggConvergent and JoinProbe),
// However, if there are multiple pipeline breaker operators within a single pipeline, it can become very complex.
// Therefore, to simplify matters, we will include the pipeline schedule duration in the execution time of the source operator.
//
// ditto for task queuing time.
// For the queuing_time, it should be evenly distributed across all operators.
//
// TODO Refining execution summary, excluding extra time from execution time.
// For example: [total_time:6s, execution_time:1s, pending_time:2s, pipeline_waiting_time:3s]
// For example: [total_time:6s, execution_time:1s, queuing_time:2s, pipeline_breaker_wait_time:3s]

// The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + (i + 1) * extra_time / operator_num.

source_op->getProfileInfo()->execution_time += pipeline_breaker_wait_time;

UInt64 operator_num = 2 + transform_ops.size();
UInt64 per_operator_queuing_time = queuing_time / operator_num;

// The execution time of operator[i] = self_time_from_profile_info + sum(self_time_from_profile_info[i-1, .., 0]) + extra_time.
source_op->getProfileInfo()->execution_time += extra_time;
extra_time = source_op->getProfileInfo()->execution_time;
source_op->getProfileInfo()->execution_time += per_operator_queuing_time;
// Compensate for the values missing due to rounding.
source_op->getProfileInfo()->execution_time += (queuing_time - (per_operator_queuing_time * operator_num));
UInt64 time_for_prev_op = source_op->getProfileInfo()->execution_time;
for (const auto & transform_op : transform_ops)
{
transform_op->getProfileInfo()->execution_time += extra_time;
extra_time = transform_op->getProfileInfo()->execution_time;
transform_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op);
time_for_prev_op = transform_op->getProfileInfo()->execution_time;
}
sink_op->getProfileInfo()->execution_time += extra_time;
sink_op->getProfileInfo()->execution_time += (per_operator_queuing_time + time_for_prev_op);
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class PipelineExec : private boost::noncopyable

OperatorStatus await();

void finalizeProfileInfo(UInt64 extra_time);
void finalizeProfileInfo(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time);

private:
inline OperatorStatus executeImpl();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PipelineTask

void doFinalizeImpl() override
{
runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs() + getScheduleDuration());
runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs(), getScheduleDuration());
}
};
} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ class PipelineTaskBase
}
}

void runFinalize(UInt64 extra_time)
void runFinalize(UInt64 queuing_time, UInt64 pipeline_breaker_wait_time)
{
assert(pipeline_exec);
pipeline_exec->executeSuffix();
pipeline_exec->finalizeProfileInfo(extra_time);
pipeline_exec->finalizeProfileInfo(queuing_time, pipeline_breaker_wait_time);
pipeline_exec = nullptr;
pipeline_exec_holder.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SimplePipelineTask

void finalizeImpl() override
{
runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs());
runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs(), 0);
}
};
} // namespace DB