Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Refine execution summary with ExecutorStatisticsCollector #6995

Merged
merged 27 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,14 +14,15 @@

#include <Debug/MockExecutor/AstToPBUtils.h>
#include <Debug/dbgQueryExecutor.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/CoprocessorHandler.h>
#include <Flash/Mpp/MPPTask.h>
#include <Interpreters/Context.h>
#include <Server/MockComputeClient.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <TestUtils/TiFlashTestEnv.h>

namespace DB
{
using TiFlashTestEnv = tests::TiFlashTestEnv;
Expand Down Expand Up @@ -299,7 +300,7 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
}
}

tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version, Timestamp start_ts, std::vector<std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr>> & key_ranges)
tipb::SelectResponse executeDAGRequest(Context & context, tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version, Timestamp start_ts, std::vector<std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr>> & key_ranges)
{
static auto log = Logger::get();
LOG_DEBUG(log, "Handling DAG request: {}", dag_request.DebugString());
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Debug/dbgQueryExecutor.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
namespace DB
{
using MockServerConfig = tests::MockServerConfig;
struct DecodedTiKVKey;
using DecodedTiKVKeyPtr = std::shared_ptr<DecodedTiKVKey>;

BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream);
BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & properties, QueryTasks & query_tasks);
Expand All @@ -26,7 +28,7 @@ std::vector<BlockInputStreamPtr> executeMPPQueryWithMultipleContext(const DAGPro

tipb::SelectResponse executeDAGRequest(
Context & context,
const tipb::DAGRequest & dag_request,
tipb::DAGRequest & dag_request,
RegionID region_id,
UInt64 region_version,
UInt64 region_conf_version,
Expand Down
55 changes: 37 additions & 18 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,10 @@
#include <Flash/Coprocessor/RequestUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Planner/ExecutorIdGenerator.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/Transaction/TMTContext.h>
#include <tipb/executor.pb.h>

namespace DB
{
Expand All @@ -37,7 +39,7 @@ bool strictSqlMode(UInt64 sql_mode)
}

// for non-mpp(cop/batchCop)
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -55,18 +57,19 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo
, warning_count(0)
, keyspace_id(keyspace_id_)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors());
const auto & root_executor = isTreeBasedExecutors()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

// for mpp
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -83,14 +86,15 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMet
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(meta_))
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id());
root_executor_id = dag_request->root_executor().executor_id();
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
initListBasedExecutors();
}

DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_)
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -109,10 +113,11 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTa
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id());
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
initListBasedExecutors();
}

// for test
Expand All @@ -130,7 +135,7 @@ DAGContext::DAGContext(UInt64 max_error_count_)
{}

// for tests need to run query tasks.
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -145,14 +150,15 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, String log_identif
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors());
const auto & root_executor = isTreeBasedExecutors()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}

void DAGContext::initOutputInfo()
Expand All @@ -173,14 +179,27 @@ void DAGContext::initOutputInfo()
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

String DAGContext::getRootExecutorId()

void DAGContext::initListBasedExecutors()
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
{
if (!isTreeBasedExecutors())
{
ExecutorIdGenerator id_generator;
for (int i = 0; i < dag_request->executors_size(); ++i)
{
auto * executor = dag_request->mutable_executors(i);
const auto & executor_id = id_generator.generate(*executor);
list_based_executors_order.push_back(executor_id);
// Set executor_id for list based executor,
// then we can fill executor_id for Execution Summaries of list-based executors
executor->set_executor_id(executor_id);
}
}
}

bool DAGContext::isTreeBasedExecutors() const
{
// If return_executor_id is false, we can get the generated executor_id from list_based_executors_order.
return return_executor_id
? root_executor_id
: (list_based_executors_order.empty()
? ""
: list_based_executors_order.back());
return dag_request->has_root_executor();
}

bool DAGContext::allowZeroInDate() const
Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,19 +128,19 @@ class DAGContext
{
public:
// for non-mpp(cop/batchCop)
DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);
DAGContext(tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);

// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);
DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);

// for disaggregated task on write node
DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_);
DAGContext(tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_);

// for test
explicit DAGContext(UInt64 max_error_count_);

// for tests need to run query tasks.
DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency);
DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency);

std::unordered_map<String, BlockInputStreams> & getProfileStreamsMap();

Expand Down Expand Up @@ -273,9 +273,8 @@ class DAGContext
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getRootExecutorId();

const tipb::DAGRequest * dag_request;
tipb::DAGRequest * dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
/// dummy_query_string and dummy_ast is used for that
String dummy_query_string;
Expand Down Expand Up @@ -327,6 +326,8 @@ class DAGContext
private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();
void initListBasedExecutors();
bool isTreeBasedExecutors() const;

private:
std::shared_ptr<ProcessListEntry> process_list_entry;
Expand Down
12 changes: 7 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/Statistics/ExecutorStatisticsCollector.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
Expand Down Expand Up @@ -114,8 +114,9 @@ try

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
summary_collector.addExecuteSummaries(*dag_response);
ExecutorStatisticsCollector statistics_collector(log->identifier());
statistics_collector.initialize(&dag_context);
statistics_collector.fillExecuteSummaries(*dag_response);
}
}
else
Expand Down Expand Up @@ -146,8 +147,9 @@ try

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
auto execution_summary_response = summary_collector.genExecutionSummaryResponse();
ExecutorStatisticsCollector statistics_collector(log->identifier());
statistics_collector.initialize(&dag_context);
auto execution_summary_response = statistics_collector.genExecutionSummaryResponse();
streaming_writer->write(execution_summary_response);
}
}
Expand Down
30 changes: 8 additions & 22 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -187,6 +187,7 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi
{
for (int i = executors.size() - 1; i >= 0; i--)
{
assert(executors[i].has_executor_id());
switch (executors[i].tp())
{
case tipb::ExecType::TypeTableScan:
Expand All @@ -196,48 +197,33 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi
/// the executor name, it will result in the same order as it is
/// in the dag_request, this is needed when filling execution_summary
/// in DAGDriver
if (executors[i].has_executor_id())
source_name = executors[i].executor_id();
else
source_name = std::to_string(i) + "_tablescan";
source_name = executors[i].executor_id();
break;
case tipb::ExecType::TypeSelection:
GET_METRIC(tiflash_coprocessor_executor_count, type_sel).Increment();
assignOrThrowException(&selection, &executors[i], SEL_NAME);
if (executors[i].has_executor_id())
selection_name = executors[i].executor_id();
else
selection_name = std::to_string(i) + "_selection";
selection_name = executors[i].executor_id();
break;
case tipb::ExecType::TypeStreamAgg:
RUNTIME_CHECK_MSG(executors[i].aggregation().group_by_size() == 0, STREAM_AGG_ERROR);
case tipb::ExecType::TypeAggregation:
GET_METRIC(tiflash_coprocessor_executor_count, type_agg).Increment();
assignOrThrowException(&aggregation, &executors[i], AGG_NAME);
if (executors[i].has_executor_id())
aggregation_name = executors[i].executor_id();
else
aggregation_name = std::to_string(i) + "_aggregation";
aggregation_name = executors[i].executor_id();
break;
case tipb::ExecType::TypeTopN:
GET_METRIC(tiflash_coprocessor_executor_count, type_topn).Increment();
assignOrThrowException(&limit_or_topn, &executors[i], TOPN_NAME);
if (executors[i].has_executor_id())
limit_or_topn_name = executors[i].executor_id();
else
limit_or_topn_name = std::to_string(i) + "_limitOrTopN";
limit_or_topn_name = executors[i].executor_id();
break;
case tipb::ExecType::TypeLimit:
GET_METRIC(tiflash_coprocessor_executor_count, type_limit).Increment();
assignOrThrowException(&limit_or_topn, &executors[i], LIMIT_NAME);
if (executors[i].has_executor_id())
limit_or_topn_name = executors[i].executor_id();
else
limit_or_topn_name = std::to_string(i) + "_limitOrTopN";
limit_or_topn_name = executors[i].executor_id();
break;
default:
throw TiFlashException(
"Unsupported executor in DAG request: " + executors[i].DebugString(),
fmt::format("Unsupported executor in DAG request: {}", executors[i].DebugString()),
Errors::Coprocessor::Unimplemented);
}
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <DataStreams/WindowBlockInputStream.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
Expand Down Expand Up @@ -625,7 +626,10 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
{
TiDBTableScan table_scan(query_block.source, query_block.source_name, dagContext());
if (unlikely(context.isTest()))
{
handleMockTableScan(table_scan, pipeline);
recordProfileStreams(pipeline, query_block.source_name);
}
else
handleTableScan(table_scan, pipeline);
dagContext().table_scan_executor_id = query_block.source_name;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,9 @@ namespace DB
class DAGQueryBlock;
class ExchangeReceiver;
class DAGExpressionAnalyzer;
struct SubqueryForSet;
class Join;
using JoinPtr = std::shared_ptr<Join>;

/** build ch plan from dag request: dag executors -> ch plan
*/
Expand Down
Loading