Skip to content

Commit

Permalink
.*: spilt profile info from Interpreters::Join and implement `Pipel…
Browse files Browse the repository at this point in the history
…ineExecutor::getRuntimeStatistics` (#7778)

ref #6518
  • Loading branch information
SeaRise authored Jul 13, 2023
1 parent 24acf81 commit c04008c
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 11 deletions.
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ using MPPReceiverSetPtr = std::shared_ptr<MPPReceiverSet>;
class CoprocessorReader;
using CoprocessorReaderPtr = std::shared_ptr<CoprocessorReader>;

class Join;
using JoinPtr = std::shared_ptr<Join>;
struct JoinProfileInfo;
using JoinProfileInfoPtr = std::shared_ptr<JoinProfileInfo>;
struct JoinExecuteInfo
{
String build_side_root_executor_id;
JoinPtr join_ptr;
JoinProfileInfoPtr join_profile_info;
BlockInputStreams join_build_streams;
OperatorProfileInfos join_build_profile_infos;
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ void DAGQueryBlockInterpreter::recordJoinExecuteInfo(size_t build_side_index, co
const auto * build_side_root_executor = query_block.children[build_side_index]->root;
JoinExecuteInfo join_execute_info;
join_execute_info.build_side_root_executor_id = build_side_root_executor->executor_id();
join_execute_info.join_ptr = join_ptr;
assert(join_execute_info.join_ptr);
join_execute_info.join_profile_info = join_ptr->profile_info;
assert(join_execute_info.join_profile_info);
dagContext().getJoinExecuteInfoMap()[query_block.source_name] = std::move(join_execute_info);
}

Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,21 @@ RU PipelineExecutor::collectRequestUnit()

Block PipelineExecutor::getSampleBlock() const
{
assert(root_pipeline);
return root_pipeline->getSampleBlock();
}

BaseRuntimeStatistics PipelineExecutor::getRuntimeStatistics() const
{
// TODO support getRuntimeStatistics
assert(root_pipeline);
auto final_plan_exec_id = root_pipeline->getFinalPlanExecId();
BaseRuntimeStatistics runtime_statistics;
if (!final_plan_exec_id.empty())
{
const auto & final_profile_infos = context.getDAGContext()->getOperatorProfileInfosMap()[final_plan_exec_id];
for (const auto & profile_info : final_profile_infos)
runtime_statistics.append(*profile_info);
}
return runtime_statistics;
}
} // namespace DB
12 changes: 12 additions & 0 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ void Pipeline::addGetResultSink(const ResultQueuePtr & result_queue)
addPlanNode(get_result_sink);
}

String Pipeline::getFinalPlanExecId() const
{
// NOLINTNEXTLINE(modernize-loop-convert)
for (auto it = plan_nodes.crbegin(); it != plan_nodes.crend(); ++it)
{
const auto & plan_node = *it;
if (plan_node->isTiDBOperator())
return plan_node->execId();
}
return "";
}

PipelineExecGroup Pipeline::buildExecGroup(PipelineExecutorContext & exec_context, Context & context, size_t concurrency)
{
RUNTIME_CHECK(!plan_nodes.empty());
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class Pipeline : public std::enable_shared_from_this<Pipeline>
/// taskn──┘ └──►taskm
EventPtr complete(PipelineExecutorContext & exec_context);

String getFinalPlanExecId() const;

private:
void toSelfString(FmtBuffer & buffer, size_t level) const;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ void recordJoinExecuteInfo(
{
JoinExecuteInfo join_execute_info;
join_execute_info.build_side_root_executor_id = build_side_executor_id;
join_execute_info.join_ptr = join_ptr;
RUNTIME_CHECK(join_execute_info.join_ptr);
join_execute_info.join_profile_info = join_ptr->profile_info;
RUNTIME_CHECK(join_execute_info.join_profile_info);
dag_context.getJoinExecuteInfoMap()[executor_id] = std::move(join_execute_info);
}
} // namespace
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Statistics/JoinImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ void JoinStatistics::collectExtraRuntimeDetail()
if (it != join_execute_info_map.end())
{
const auto & join_execute_info = it->second;
peak_build_bytes_usage = join_execute_info.join_ptr->getPeakBuildBytesUsage();
peak_build_bytes_usage = join_execute_info.join_profile_info->peak_build_bytes_usage;
build_side_child = join_execute_info.build_side_root_executor_id;
is_spill_enabled = join_execute_info.join_ptr->isEnableSpill();
is_spilled = join_execute_info.join_ptr->isSpilled();
is_spill_enabled = join_execute_info.join_profile_info->is_spill_enabled;
is_spilled = join_execute_info.join_profile_info->is_spilled;
switch (dag_context.getExecutionMode())
{
case ExecutionMode::None:
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,13 @@ void Join::workAfterBuildFinish()
finalizeRuntimeFilter();
}

void Join::finalizeProfileInfo()
{
profile_info->is_spill_enabled = isEnableSpill();
profile_info->is_spilled = isSpilled();
profile_info->peak_build_bytes_usage = getPeakBuildBytesUsage();
}

void Join::workAfterProbeFinish()
{
if (isEnableSpill())
Expand All @@ -1554,6 +1561,7 @@ void Join::workAfterProbeFinish()
}
}
}
finalizeProfileInfo();
}

void Join::waitUntilAllBuildFinished() const
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,19 @@
#include <Interpreters/ProbeProcessInfo.h>
#include <Interpreters/SettingsCommon.h>

#include <memory>
#include <shared_mutex>

namespace DB
{
struct JoinProfileInfo
{
UInt64 peak_build_bytes_usage = 0;
bool is_spill_enabled = false;
bool is_spilled = false;
};
using JoinProfileInfoPtr = std::shared_ptr<JoinProfileInfo>;

class Join;
using JoinPtr = std::shared_ptr<Join>;

Expand Down Expand Up @@ -277,6 +286,8 @@ class Join
// used to name the column that records matched map entry before other conditions filter
const String flag_mapped_entry_helper_name;

const JoinProfileInfoPtr profile_info = std::make_shared<JoinProfileInfo>();

private:
friend class ScanHashMapAfterProbeBlockInputStream;

Expand Down Expand Up @@ -450,6 +461,8 @@ class Join
void workAfterProbeFinish();

void generateRuntimeFilterValues(const Block & block);

void finalizeProfileInfo();
};

} // namespace DB

0 comments on commit c04008c

Please sign in to comment.