Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce physical plan and add switch #4820

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/license-checker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ name: License checker
on:
push:
branches:
- master
- planner_refactory
pull_request:
branches:
- master
- planner_refactory
Copy link
Contributor Author

@SeaRise SeaRise Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified for license checker.
I'll revert it to master at the last pr


jobs:
check-license:
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Common/TiFlashException.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ namespace DB
"This error usually occurs when the TiFlash server is busy or the TiFlash node is down.\n", \
""); \
) \
C(Planner, \
E(BadRequest, "Bad TiDB DAGRequest.", \
"This error is usually caused by incorrect TiDB DAGRequest. \n" \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(Unimplemented, "Some features are unimplemented.", \
"This error may caused by unmatched TiDB and TiFlash versions, \n" \
"and should not occur in common case. \n" \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(Internal, "TiFlash Planner internal error.", \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
) \
C(Table, \
E(SchemaVersionError, "Schema version of target table in TiFlash is different from that in query.", \
"TiFlash will sync the newest schema from TiDB before processing every query. \n" \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(flash_service .)
add_headers_and_sources(flash_service ./Coprocessor)
add_headers_and_sources(flash_service ./Mpp)
add_headers_and_sources(flash_service ./Planner)
add_headers_and_sources(flash_service ./Planner/plans)
add_headers_and_sources(flash_service ./Statistics)
add_headers_and_sources(flash_service ./Management)

Expand Down
25 changes: 19 additions & 6 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Planner/Planner.h>
#include <Interpreters/Context.h>

namespace DB
Expand Down Expand Up @@ -65,12 +66,24 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block)
BlockInputStreams child_streams = executeQueryBlock(*child);
input_streams_vec.push_back(child_streams);
}
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
if (context.getSettingsRef().enable_planner && Planner::isSupported(query_block))
{
Planner planner(
windtalker marked this conversation as resolved.
Show resolved Hide resolved
context,
input_streams_vec,
query_block,
max_streams);
return planner.execute();
}
else
{
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
}
}

BlockIO InterpreterDAG::execute()
Expand Down
73 changes: 73 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FmtUtils.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <Flash/Planner/PhysicalPlanHelper.h>
#include <Interpreters/Context.h>

namespace DB
{
PhysicalPlan::PhysicalPlan(
const String & executor_id_,
const PlanType & type_,
const NamesAndTypes & schema_,
const String & req_id)
: executor_id(executor_id_)
, type(type_)
, schema(schema_)
, log(Logger::get(type_.toString(), req_id))
{}

String PhysicalPlan::toString()
{
auto schema_to_string = [&]() {
FmtBuffer buffer;
buffer.joinStr(
schema.cbegin(),
schema.cend(),
[](const auto & item, FmtBuffer & buf) { buf.fmtAppend("<{}, {}>", item.name, item.type->getName()); },
", ");
return buffer.toString();
};
return fmt::format(
"type: {}, executor_id: {}, is_record_profile_streams: {}, schema: {}",
type.toString(),
executor_id,
is_record_profile_streams,
schema_to_string());
}

void PhysicalPlan::finalize()
{
finalize(PhysicalPlanHelper::schemaToNames(schema));
}

void PhysicalPlan::recordProfileStreams(DAGPipeline & pipeline, const Context & context)
{
if (is_record_profile_streams)
{
auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id];
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
}
}

void PhysicalPlan::transform(DAGPipeline & pipeline, Context & context, size_t max_streams)
{
transformImpl(pipeline, context, max_streams);
recordProfileStreams(pipeline, context);
}
} // namespace DB
83 changes: 83 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Core/Block.h>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Flash/Planner/PlanType.h>

#include <memory>

namespace DB
{
struct DAGPipeline;
class Context;
class DAGContext;

class PhysicalPlan;
using PhysicalPlanPtr = std::shared_ptr<PhysicalPlan>;

class PhysicalPlan
{
public:
PhysicalPlan(
const String & executor_id_,
const PlanType & type_,
const NamesAndTypes & schema_,
const String & req_id);

virtual ~PhysicalPlan() = default;

virtual PhysicalPlanPtr children(size_t /*i*/) const = 0;

virtual void setChild(size_t /*i*/, const PhysicalPlanPtr & /*new_child*/) = 0;

const PlanType & tp() const { return type; }

const String & execId() const { return executor_id; }

const NamesAndTypes & getSchema() const { return schema; }

virtual void appendChild(const PhysicalPlanPtr & /*new_child*/) = 0;

virtual size_t childrenSize() const = 0;

virtual void transform(DAGPipeline & pipeline, Context & context, size_t max_streams);

virtual void finalize(const Names & parent_require) = 0;
void finalize();

/// Obtain a sample block that contains the names and types of result columns.
virtual const Block & getSampleBlock() const = 0;

void disableRecordProfileStreams() { is_record_profile_streams = false; }

String toString();

protected:
virtual void transformImpl(DAGPipeline & /*pipeline*/, Context & /*context*/, size_t /*max_streams*/){};

void recordProfileStreams(DAGPipeline & pipeline, const Context & context);

String executor_id;
PlanType type;
NamesAndTypes schema;
bool is_record_profile_streams = true;

LoggerPtr log;
};
} // namespace DB
24 changes: 24 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Planner/PhysicalPlanBuilder.h>
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
#include <Flash/Planner/plans/PhysicalSource.h>

namespace DB
{
void PhysicalPlanBuilder::buildSource(const Block & sample_block)
{
cur_plans.push_back(PhysicalSource::build(sample_block, log->identifier()));
}
} // namespace DB
47 changes: 47 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <common/logger_useful.h>

namespace DB
{
class PhysicalPlanBuilder
{
public:
explicit PhysicalPlanBuilder(Context & context_, const String & req_id)
: context(context_)
, log(Logger::get("PhysicalPlanBuilder", req_id))
{}

void buildSource(const Block & sample_block);

PhysicalPlanPtr getResult() const
{
RUNTIME_ASSERT(cur_plans.size() == 1, log, "There can only be one plan output, but here are {}", cur_plans.size());
return cur_plans.back();
}

private:
std::vector<PhysicalPlanPtr> cur_plans;

[[maybe_unused]] Context & context;

LoggerPtr log;
};
} // namespace DB
27 changes: 27 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Planner/PhysicalPlanHelper.h>

namespace DB::PhysicalPlanHelper
{
Names schemaToNames(const NamesAndTypes & schema)
{
Names names;
names.reserve(schema.size());
for (const auto & column : schema)
names.push_back(column.name);
return names;
}
} // namespace DB::PhysicalPlanHelper
22 changes: 22 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/NamesAndTypes.h>

namespace DB::PhysicalPlanHelper
{
Names schemaToNames(const NamesAndTypes & schema);
} // namespace DB::PhysicalPlanHelper
30 changes: 30 additions & 0 deletions dbms/src/Flash/Planner/PlanType.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Planner/PlanType.h>

namespace DB
{
String PlanType::toString() const
{
switch (enum_value)
{
case Source:
return "Source";
default:
throw TiFlashException("Unknown PlanType", Errors::Planner::Internal);
}
}
} // namespace DB
Loading