Skip to content

Commit

Permalink
*: Refine execution summary with ExecutorStatisticsCollector (#6995)
Browse files Browse the repository at this point in the history
ref #5900, ref #6518
  • Loading branch information
ywqzzy authored Mar 21, 2023
1 parent faee282 commit 91f4896
Show file tree
Hide file tree
Showing 57 changed files with 810 additions and 535 deletions.
7 changes: 4 additions & 3 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,14 +14,15 @@

#include <Debug/MockExecutor/AstToPBUtils.h>
#include <Debug/dbgQueryExecutor.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/CoprocessorHandler.h>
#include <Flash/Mpp/MPPTask.h>
#include <Interpreters/Context.h>
#include <Server/MockComputeClient.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <TestUtils/TiFlashTestEnv.h>

namespace DB
{
using TiFlashTestEnv = tests::TiFlashTestEnv;
Expand Down Expand Up @@ -299,7 +300,7 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
}
}

tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version, Timestamp start_ts, std::vector<std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr>> & key_ranges)
tipb::SelectResponse executeDAGRequest(Context & context, tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version, Timestamp start_ts, std::vector<std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr>> & key_ranges)
{
static auto log = Logger::get();
LOG_DEBUG(log, "Handling DAG request: {}", dag_request.DebugString());
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Debug/dbgQueryExecutor.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
namespace DB
{
using MockServerConfig = tests::MockServerConfig;
struct DecodedTiKVKey;
using DecodedTiKVKeyPtr = std::shared_ptr<DecodedTiKVKey>;

BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream);
BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & properties, QueryTasks & query_tasks);
Expand All @@ -26,7 +28,7 @@ std::vector<BlockInputStreamPtr> executeMPPQueryWithMultipleContext(const DAGPro

tipb::SelectResponse executeDAGRequest(
Context & context,
const tipb::DAGRequest & dag_request,
tipb::DAGRequest & dag_request,
RegionID region_id,
UInt64 region_version,
UInt64 region_conf_version,
Expand Down
55 changes: 37 additions & 18 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,10 @@
#include <Flash/Coprocessor/RequestUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Planner/ExecutorIdGenerator.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/Transaction/TMTContext.h>
#include <tipb/executor.pb.h>

namespace DB
{
Expand All @@ -37,7 +39,7 @@ bool strictSqlMode(UInt64 sql_mode)
}

// for non-mpp(cop/batchCop)
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -55,18 +57,19 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo
, warning_count(0)
, keyspace_id(keyspace_id_)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors());
const auto & root_executor = isTreeBasedExecutors()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}

// for mpp
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -83,14 +86,15 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMet
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(meta_))
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id());
root_executor_id = dag_request->root_executor().executor_id();
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
initListBasedExecutors();
}

DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_)
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -109,10 +113,11 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTa
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id());
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
initListBasedExecutors();
}

// for test
Expand All @@ -130,7 +135,7 @@ DAGContext::DAGContext(UInt64 max_error_count_)
{}

// for tests need to run query tasks.
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -145,14 +150,15 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, String log_identif
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors());
const auto & root_executor = isTreeBasedExecutors()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}

void DAGContext::initOutputInfo()
Expand All @@ -173,14 +179,27 @@ void DAGContext::initOutputInfo()
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

String DAGContext::getRootExecutorId()

void DAGContext::initListBasedExecutors()
{
if (!isTreeBasedExecutors())
{
ExecutorIdGenerator id_generator;
for (int i = 0; i < dag_request->executors_size(); ++i)
{
auto * executor = dag_request->mutable_executors(i);
const auto & executor_id = id_generator.generate(*executor);
list_based_executors_order.push_back(executor_id);
// Set executor_id for list based executor,
// then we can fill executor_id for Execution Summaries of list-based executors
executor->set_executor_id(executor_id);
}
}
}

bool DAGContext::isTreeBasedExecutors() const
{
// If return_executor_id is false, we can get the generated executor_id from list_based_executors_order.
return return_executor_id
? root_executor_id
: (list_based_executors_order.empty()
? ""
: list_based_executors_order.back());
return dag_request->has_root_executor();
}

bool DAGContext::allowZeroInDate() const
Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,19 +128,19 @@ class DAGContext
{
public:
// for non-mpp(cop/batchCop)
DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);
DAGContext(tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);

// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);
DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);

// for disaggregated task on write node
DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_);
DAGContext(tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_);

// for test
explicit DAGContext(UInt64 max_error_count_);

// for tests need to run query tasks.
DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency);
DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency);

std::unordered_map<String, BlockInputStreams> & getProfileStreamsMap();

Expand Down Expand Up @@ -273,9 +273,8 @@ class DAGContext
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getRootExecutorId();

const tipb::DAGRequest * dag_request;
tipb::DAGRequest * dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
/// dummy_query_string and dummy_ast is used for that
String dummy_query_string;
Expand Down Expand Up @@ -327,6 +326,8 @@ class DAGContext
private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();
void initListBasedExecutors();
bool isTreeBasedExecutors() const;

private:
std::shared_ptr<ProcessListEntry> process_list_entry;
Expand Down
12 changes: 7 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/Statistics/ExecutorStatisticsCollector.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
Expand Down Expand Up @@ -114,8 +114,9 @@ try

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
summary_collector.addExecuteSummaries(*dag_response);
ExecutorStatisticsCollector statistics_collector(log->identifier());
statistics_collector.initialize(&dag_context);
statistics_collector.fillExecuteSummaries(*dag_response);
}
}
else
Expand Down Expand Up @@ -146,8 +147,9 @@ try

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
auto execution_summary_response = summary_collector.genExecutionSummaryResponse();
ExecutorStatisticsCollector statistics_collector(log->identifier());
statistics_collector.initialize(&dag_context);
auto execution_summary_response = statistics_collector.genExecutionSummaryResponse();
streaming_writer->write(execution_summary_response);
}
}
Expand Down
30 changes: 8 additions & 22 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -187,6 +187,7 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi
{
for (int i = executors.size() - 1; i >= 0; i--)
{
assert(executors[i].has_executor_id());
switch (executors[i].tp())
{
case tipb::ExecType::TypeTableScan:
Expand All @@ -196,48 +197,33 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi
/// the executor name, it will result in the same order as it is
/// in the dag_request, this is needed when filling execution_summary
/// in DAGDriver
if (executors[i].has_executor_id())
source_name = executors[i].executor_id();
else
source_name = std::to_string(i) + "_tablescan";
source_name = executors[i].executor_id();
break;
case tipb::ExecType::TypeSelection:
GET_METRIC(tiflash_coprocessor_executor_count, type_sel).Increment();
assignOrThrowException(&selection, &executors[i], SEL_NAME);
if (executors[i].has_executor_id())
selection_name = executors[i].executor_id();
else
selection_name = std::to_string(i) + "_selection";
selection_name = executors[i].executor_id();
break;
case tipb::ExecType::TypeStreamAgg:
RUNTIME_CHECK_MSG(executors[i].aggregation().group_by_size() == 0, STREAM_AGG_ERROR);
case tipb::ExecType::TypeAggregation:
GET_METRIC(tiflash_coprocessor_executor_count, type_agg).Increment();
assignOrThrowException(&aggregation, &executors[i], AGG_NAME);
if (executors[i].has_executor_id())
aggregation_name = executors[i].executor_id();
else
aggregation_name = std::to_string(i) + "_aggregation";
aggregation_name = executors[i].executor_id();
break;
case tipb::ExecType::TypeTopN:
GET_METRIC(tiflash_coprocessor_executor_count, type_topn).Increment();
assignOrThrowException(&limit_or_topn, &executors[i], TOPN_NAME);
if (executors[i].has_executor_id())
limit_or_topn_name = executors[i].executor_id();
else
limit_or_topn_name = std::to_string(i) + "_limitOrTopN";
limit_or_topn_name = executors[i].executor_id();
break;
case tipb::ExecType::TypeLimit:
GET_METRIC(tiflash_coprocessor_executor_count, type_limit).Increment();
assignOrThrowException(&limit_or_topn, &executors[i], LIMIT_NAME);
if (executors[i].has_executor_id())
limit_or_topn_name = executors[i].executor_id();
else
limit_or_topn_name = std::to_string(i) + "_limitOrTopN";
limit_or_topn_name = executors[i].executor_id();
break;
default:
throw TiFlashException(
"Unsupported executor in DAG request: " + executors[i].DebugString(),
fmt::format("Unsupported executor in DAG request: {}", executors[i].DebugString()),
Errors::Coprocessor::Unimplemented);
}
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <DataStreams/WindowBlockInputStream.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
Expand Down Expand Up @@ -628,7 +629,10 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
{
TiDBTableScan table_scan(query_block.source, query_block.source_name, dagContext());
if (unlikely(context.isTest()))
{
handleMockTableScan(table_scan, pipeline);
recordProfileStreams(pipeline, query_block.source_name);
}
else
handleTableScan(table_scan, pipeline);
dagContext().table_scan_executor_id = query_block.source_name;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,9 @@ namespace DB
class DAGQueryBlock;
class ExchangeReceiver;
class DAGExpressionAnalyzer;
struct SubqueryForSet;
class Join;
using JoinPtr = std::shared_ptr<Join>;

/** build ch plan from dag request: dag executors -> ch plan
*/
Expand Down
Loading

0 comments on commit 91f4896

Please sign in to comment.