Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Refine execution summary with ExecutorStatisticsCollector #6995

Merged
merged 27 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,10 @@
#include <Flash/Coprocessor/RequestUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Planner/ExecutorIdGenerator.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/Transaction/TMTContext.h>
#include <tipb/executor.pb.h>

namespace DB
{
Expand Down Expand Up @@ -63,6 +65,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

// for mpp
Expand All @@ -88,6 +91,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMet
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
initListBasedExecutors();
}

DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_)
Expand All @@ -113,6 +117,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTa
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
initListBasedExecutors();
}

// for test
Expand Down Expand Up @@ -153,6 +158,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, String log_identif
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}

void DAGContext::initOutputInfo()
Expand All @@ -173,14 +179,19 @@ void DAGContext::initOutputInfo()
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

String DAGContext::getRootExecutorId()
void DAGContext::initListBasedExecutors()
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
{
// If return_executor_id is false, we can get the generated executor_id from list_based_executors_order.
return return_executor_id
? root_executor_id
: (list_based_executors_order.empty()
? ""
: list_based_executors_order.back());
if (!return_executor_id)
{
ExecutorIdGenerator id_generator;
traverseExecutorsReverse(dag_request, [&](const tipb::Executor & executor) {
const auto & executor_id = id_generator.generate(executor);
list_based_executors_order.push_back(executor_id);
auto * mutable_executor = const_cast<tipb::Executor *>(&executor);
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
mutable_executor->set_executor_id(executor_id);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
return true;
});
}
}

bool DAGContext::allowZeroInDate() const
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -273,7 +273,6 @@ class DAGContext
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getRootExecutorId();

const tipb::DAGRequest * dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
Expand Down Expand Up @@ -327,6 +326,7 @@ class DAGContext
private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();
void initListBasedExecutors();

private:
std::shared_ptr<ProcessListEntry> process_list_entry;
Expand Down
12 changes: 7 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/Statistics/ExecutorStatisticsCollector.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
Expand Down Expand Up @@ -114,8 +114,9 @@ try

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
summary_collector.addExecuteSummaries(*dag_response);
ExecutorStatisticsCollector statistics_collector(log->identifier());
statistics_collector.initialize(&dag_context);
statistics_collector.addExecuteSummaries(*dag_response);
}
}
else
Expand Down Expand Up @@ -146,8 +147,9 @@ try

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
auto execution_summary_response = summary_collector.genExecutionSummaryResponse();
ExecutorStatisticsCollector statistics_collector(log->identifier());
statistics_collector.initialize(&dag_context);
auto execution_summary_response = statistics_collector.genExecutionSummaryResponse();
streaming_writer->write(execution_summary_response);
}
}
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -199,15 +199,15 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi
if (executors[i].has_executor_id())
source_name = executors[i].executor_id();
else
source_name = std::to_string(i) + "_tablescan";
source_name = fmt::format("table_scan_{}", i);
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
break;
case tipb::ExecType::TypeSelection:
GET_METRIC(tiflash_coprocessor_executor_count, type_sel).Increment();
assignOrThrowException(&selection, &executors[i], SEL_NAME);
if (executors[i].has_executor_id())
selection_name = executors[i].executor_id();
else
selection_name = std::to_string(i) + "_selection";
selection_name = fmt::format("selection_{}", i);
break;
case tipb::ExecType::TypeStreamAgg:
RUNTIME_CHECK_MSG(executors[i].aggregation().group_by_size() == 0, STREAM_AGG_ERROR);
Expand All @@ -217,27 +217,27 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi
if (executors[i].has_executor_id())
aggregation_name = executors[i].executor_id();
else
aggregation_name = std::to_string(i) + "_aggregation";
aggregation_name = fmt::format("aggregation_{}", i);
break;
case tipb::ExecType::TypeTopN:
GET_METRIC(tiflash_coprocessor_executor_count, type_topn).Increment();
assignOrThrowException(&limit_or_topn, &executors[i], TOPN_NAME);
if (executors[i].has_executor_id())
limit_or_topn_name = executors[i].executor_id();
else
limit_or_topn_name = std::to_string(i) + "_limitOrTopN";
limit_or_topn_name = fmt::format("topn_{}", i);
break;
case tipb::ExecType::TypeLimit:
GET_METRIC(tiflash_coprocessor_executor_count, type_limit).Increment();
assignOrThrowException(&limit_or_topn, &executors[i], LIMIT_NAME);
if (executors[i].has_executor_id())
limit_or_topn_name = executors[i].executor_id();
else
limit_or_topn_name = std::to_string(i) + "_limitOrTopN";
limit_or_topn_name = fmt::format("limit_{}", i);
break;
default:
throw TiFlashException(
"Unsupported executor in DAG request: " + executors[i].DebugString(),
fmt::format("Unsupported executor in DAG request: {}", executors[i].DebugString()),
Errors::Coprocessor::Unimplemented);
}
}
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -625,7 +625,10 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
{
TiDBTableScan table_scan(query_block.source, query_block.source_name, dagContext());
if (unlikely(context.isTest()))
{
handleMockTableScan(table_scan, pipeline);
recordProfileStreams(pipeline, query_block.source_name);
}
else
handleTableScan(table_scan, pipeline);
dagContext().table_scan_executor_id = query_block.source_name;
Expand Down
24 changes: 1 addition & 23 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,25 +22,6 @@

namespace DB
{
namespace
{
void fillOrderForListBasedExecutors(DAGContext & dag_context, const DAGQueryBlock & query_block)
{
assert(query_block.source);
auto & list_based_executors_order = dag_context.list_based_executors_order;
list_based_executors_order.push_back(query_block.source_name);
if (query_block.selection)
list_based_executors_order.push_back(query_block.selection_name);
if (query_block.aggregation)
list_based_executors_order.push_back(query_block.aggregation_name);
if (query_block.having)
list_based_executors_order.push_back(query_block.having_name);
if (query_block.limit_or_topn)
list_based_executors_order.push_back(query_block.limit_or_topn_name);
if (query_block.exchange_sender)
dag_context.list_based_executors_order.push_back(query_block.exchange_sender_name);
}
} // namespace

DAGQuerySource::DAGQuerySource(Context & context_)
: context(context_)
Expand All @@ -54,9 +35,6 @@ DAGQuerySource::DAGQuerySource(Context & context_)
else
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors());
auto & dag_context = getDAGContext();
if (!dag_context.return_executor_id)
fillOrderForListBasedExecutors(dag_context, *root_query_block);
}
}

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Flash/Coprocessor/ExecutionSummary.h>
#include <Flash/Statistics/BaseRuntimeStatistics.h>

namespace DB
{
Expand All @@ -34,6 +35,14 @@ void ExecutionSummary::merge(const tipb::ExecutorExecutionSummary & other)
scan_context->merge(other.tiflash_scan_context());
}

void ExecutionSummary::set(const BaseRuntimeStatistics & other)
{
time_processed_ns = other.execution_time_ns;
num_produced_rows += other.rows;
num_iterations += other.blocks;
concurrency += other.concurrency;
}
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved

void ExecutionSummary::init(const tipb::ExecutorExecutionSummary & other)
{
time_processed_ns = other.time_processed_ns();
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/ExecutionSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

namespace DB
{

struct BaseRuntimeStatistics;
/// do not need be thread safe since it is only used in single thread env
struct ExecutionSummary
{
Expand All @@ -36,6 +38,7 @@ struct ExecutionSummary

void merge(const ExecutionSummary & other);
void merge(const tipb::ExecutorExecutionSummary & other);
void set(const BaseRuntimeStatistics & other);
void init(const tipb::ExecutorExecutionSummary & other);
};

Expand Down
Loading