Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.*: Refine the checking of dag request #7124

Merged
merged 9 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 1 addition & 49 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <Flash/Coprocessor/RequestUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Planner/ExecutorIdGenerator.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/Transaction/TMTContext.h>
#include <kvproto/disaggregated.pb.h>
Expand Down Expand Up @@ -58,15 +57,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, TablesRegionsInfo && tab
, warning_count(0)
, keyspace_id(keyspace_id_)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors());
const auto & root_executor = isTreeBasedExecutors()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}

// for mpp
Expand All @@ -75,7 +66,6 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, return_executor_id(true)
, is_mpp_task(true)
, is_root_mpp_task(is_root_mpp_task_)
, flags(dag_request->flags())
Expand All @@ -87,12 +77,9 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(meta_))
{
RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id());
root_executor_id = dag_request->root_executor().executor_id();
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
initListBasedExecutors();
}

// for disaggregated task on write node
Expand All @@ -116,11 +103,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const disaggregated::Dis
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(task_meta_))
{
RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id());
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
initListBasedExecutors();
}

// for test
Expand Down Expand Up @@ -153,15 +136,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, s
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors());
const auto & root_executor = isTreeBasedExecutors()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}

void DAGContext::initOutputInfo()
Expand All @@ -182,29 +157,6 @@ void DAGContext::initOutputInfo()
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}


void DAGContext::initListBasedExecutors()
{
if (!isTreeBasedExecutors())
{
ExecutorIdGenerator id_generator;
for (int i = 0; i < dag_request->executors_size(); ++i)
{
auto * executor = dag_request->mutable_executors(i);
const auto & executor_id = id_generator.generate(*executor);
list_based_executors_order.push_back(executor_id);
// Set executor_id for list based executor,
// then we can fill executor_id for Execution Summaries of list-based executors
executor->set_executor_id(executor_id);
}
}
}

bool DAGContext::isTreeBasedExecutors() const
{
return dag_request->has_root_executor();
}

bool DAGContext::allowZeroInDate() const
{
return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE;
Expand Down Expand Up @@ -240,7 +192,7 @@ void DAGContext::initExecutorIdToJoinIdMap()
return;

executor_id_to_join_id_map.clear();
traverseExecutorsReverse(dag_request, [&](const tipb::Executor & executor) {
dag_request.traverseReverse([&](const tipb::Executor & executor) {
std::vector<String> all_join_id;
// for mpp, dag_request.has_root_executor() == true, can call `getChildren` directly.
getChildren(executor).forEach([&](const tipb::Executor & child) {
Expand Down
12 changes: 2 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <kvproto/mpp.pb.h>
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Logger.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/DAGRequest.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
Expand Down Expand Up @@ -274,7 +274,7 @@ class DAGContext

KeyspaceID getKeyspaceID() const { return keyspace_id; }

tipb::DAGRequest * dag_request;
DAGRequest dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
/// dummy_query_string and dummy_ast is used for that
String dummy_query_string;
Expand All @@ -291,8 +291,6 @@ class DAGContext
// For disaggregated read, this is the host of compute node
String tidb_host = "Unknown";
bool collect_execution_summaries{};
bool return_executor_id{};
String root_executor_id;
/* const */ bool is_mpp_task = false;
/* const */ bool is_root_mpp_task = false;
/* const */ bool is_batch_cop = false;
Expand All @@ -313,10 +311,6 @@ class DAGContext
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

/// Hold the order of list based executors.
/// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors.
std::vector<String> list_based_executors_order;

/// executor_id, ScanContextPtr
/// Currently, max(scan_context_map.size()) == 1, because one mpp task only have do one table scan
/// While when we support collcate join later, scan_context_map.size() may > 1,
Expand All @@ -326,8 +320,6 @@ class DAGContext
private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();
void initListBasedExecutors();
bool isTreeBasedExecutors() const;

private:
std::shared_ptr<ProcessListEntry> process_list_entry;
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ namespace DB
DAGQuerySource::DAGQuerySource(Context & context_)
: context(context_)
{
const tipb::DAGRequest & dag_request = *getDAGContext().dag_request;
if (dag_request.has_root_executor())
const auto & dag_request = getDAGContext().dag_request;
if (dag_request.isTreeBased())
{
QueryBlockIDGenerator id_generator;
root_query_block = std::make_shared<DAGQueryBlock>(dag_request.root_executor(), id_generator);
root_query_block = std::make_shared<DAGQueryBlock>(dag_request->root_executor(), id_generator);
}
else
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors());
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request->executors());
}
}

Expand Down
173 changes: 173 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGRequest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/DAGRequest.h>
#include <Flash/Statistics/traverseExecutors.h>

namespace DB
{
namespace
{
void check(bool condition, const String & err_msg)
{
if unlikely (!condition)
throw TiFlashException(err_msg, Errors::Coprocessor::BadRequest);
}

class ListBasedExecutorIdGenerator
{
public:
explicit ListBasedExecutorIdGenerator(tipb::DAGRequest * dag_request)
{
assert(dag_request->executors_size() > 0);
// To check duplicate executor_id for list based request.
// Normally list based requests do not set the executor id, but here it is just in case.
for (int i = 0; i < dag_request->executors_size(); ++i)
{
const auto & executor = dag_request->executors(i);
if (executor.has_executor_id())
{
const auto & executor_id = executor.executor_id();
check(executor_id_set.find(executor_id) == executor_id_set.end(), fmt::format("in list based request, executor id `{}` duplicate, which is unexpected.", executor_id));
executor_id_set.insert(executor_id);
}
}
}

String generate(const tipb::Executor & executor)
{
// Has checked in constructor.
if (executor.has_executor_id())
return executor.executor_id();

for (size_t i = 0; i < 5; ++i)
{
auto gen_id = doGenerate(executor);
if (executor_id_set.find(gen_id) == executor_id_set.end())
{
executor_id_set.insert(gen_id);
return gen_id;
}
}
throw Exception(fmt::format("We have failed five times to generate a unique id for list base executor, exists ids are: [{}]", fmt::join(executor_id_set, ",")));
}

private:
String doGenerate(const tipb::Executor & executor)
{
assert(!executor.has_executor_id());
switch (executor.tp())
{
case tipb::ExecType::TypeSelection:
return fmt::format("selection_{}", current_id++);
case tipb::ExecType::TypeProjection:
return fmt::format("project_{}", current_id++);
case tipb::ExecType::TypeStreamAgg:
case tipb::ExecType::TypeAggregation:
return fmt::format("aggregation_{}", current_id++);
case tipb::ExecType::TypeTopN:
return fmt::format("topn_{}", current_id++);
case tipb::ExecType::TypeLimit:
return fmt::format("limit_{}", current_id++);
case tipb::ExecType::TypeExchangeSender:
return fmt::format("exchange_sender_{}", current_id++);
case tipb::ExecType::TypeExchangeReceiver:
return fmt::format("exchange_receiver_{}", current_id++);
case tipb::ExecType::TypeTableScan:
case tipb::ExecType::TypePartitionTableScan:
return fmt::format("table_scan_{}", current_id++);
case tipb::ExecType::TypeSort:
return fmt::format("sort_{}", current_id++);
case tipb::ExecType::TypeWindow:
return fmt::format("window_{}", current_id++);
case tipb::ExecType::TypeJoin:
return fmt::format("join_{}", current_id++);
case tipb::ExecType::TypeExpand:
return fmt::format("expand_{}", current_id++);
default:
throw TiFlashException(
fmt::format("Unsupported executor in list based DAG request: {}", executor.DebugString()),
Errors::Coprocessor::Unimplemented);
}
}

private:
UInt32 current_id = 0;
std::unordered_set<String> executor_id_set;
};
} // namespace

DAGRequest::DAGRequest(tipb::DAGRequest * dag_request_)
: dag_request(dag_request_)
{
// Will only occur in tests.
if unlikely (!dag_request)
return;

check((dag_request->executors_size() > 0) != dag_request->has_root_executor(), "dagrequest must be one of list based and tree based");
is_tree_based = dag_request->has_root_executor();

checkOrSetExecutorId();
}

void DAGRequest::checkOrSetExecutorId()
{
if (is_tree_based)
{
// check duplicate executor_id for tree based request.
std::unordered_set<String> ids;
traverseExecutorTree(dag_request->root_executor(), [&](const tipb::Executor & executor) {
check(executor.has_executor_id(), "for tree based request, executor id cannot be null");
const auto & executor_id = executor.executor_id();
check(ids.find(executor_id) == ids.end(), fmt::format("in tree based request, executor id `{}` duplicate, which is unexpected.", executor_id));
ids.insert(executor_id);
return true;
});
}
else
{
// generate executor_id for list based request.
ListBasedExecutorIdGenerator id_generator{dag_request};
for (int i = 0; i < dag_request->executors_size(); ++i)
{
auto * executor = dag_request->mutable_executors(i);
const auto & executor_id = id_generator.generate(*executor);
// Set executor_id for list based executor,
// then we can fill executor_id for Execution Summaries of list-based executors
executor->set_executor_id(executor_id);
// The begin of list_based_executors_order is the leaf node like table scan.
list_based_executors_order.push_back(executor_id);
}
}
}

const tipb::Executor & DAGRequest::rootExecutor() const
{
check(dag_request, "dagrequest can't be null");
return isTreeBased() ? dag_request->root_executor() : dag_request->executors(dag_request->executors_size() - 1);
}

void DAGRequest::traverse(std::function<bool(const tipb::Executor &)> && func) const
{
check(dag_request, "dagrequest can't be null");
traverseExecutors(dag_request, std::move(func));
}

void DAGRequest::traverseReverse(std::function<void(const tipb::Executor &)> && func) const
{
check(dag_request, "dagrequest can't be null");
traverseExecutorsReverse(dag_request, std::move(func));
}
} // namespace DB
Loading