From aabe213dac82cf5570f5680bfddec61c47e1a7b6 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 13 Jun 2022 21:44:32 +0800 Subject: [PATCH] Introduce physical plan and add switch (#4820) ref pingcap/tiflash#4739 --- .github/workflows/license-checker.yml | 4 +- dbms/src/Common/TiFlashException.h | 17 + dbms/src/Flash/CMakeLists.txt | 2 + dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 25 +- dbms/src/Flash/Planner/PhysicalPlan.cpp | 73 ++ dbms/src/Flash/Planner/PhysicalPlan.h | 83 ++ .../src/Flash/Planner/PhysicalPlanBuilder.cpp | 24 + dbms/src/Flash/Planner/PhysicalPlanBuilder.h | 47 ++ dbms/src/Flash/Planner/PhysicalPlanHelper.cpp | 27 + dbms/src/Flash/Planner/PhysicalPlanHelper.h | 22 + dbms/src/Flash/Planner/PlanType.cpp | 30 + dbms/src/Flash/Planner/PlanType.h | 46 ++ dbms/src/Flash/Planner/Planner.cpp | 78 ++ dbms/src/Flash/Planner/Planner.h | 60 ++ dbms/src/Flash/Planner/optimize.cpp | 49 ++ dbms/src/Flash/Planner/optimize.h | 23 + dbms/src/Flash/Planner/plans/PhysicalLeaf.h | 53 ++ dbms/src/Flash/Planner/plans/PhysicalSource.h | 55 ++ dbms/src/Flash/Planner/plans/PhysicalUnary.h | 66 ++ ...eter.cpp => gtest_planner_interpreter.cpp} | 326 +++++++- dbms/src/Flash/tests/gtest_qb_interpreter.cpp | 709 ++++++++++++++++++ dbms/src/Interpreters/Settings.h | 13 +- 22 files changed, 1814 insertions(+), 18 deletions(-) create mode 100644 dbms/src/Flash/Planner/PhysicalPlan.cpp create mode 100644 dbms/src/Flash/Planner/PhysicalPlan.h create mode 100644 dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp create mode 100644 dbms/src/Flash/Planner/PhysicalPlanBuilder.h create mode 100644 dbms/src/Flash/Planner/PhysicalPlanHelper.cpp create mode 100644 dbms/src/Flash/Planner/PhysicalPlanHelper.h create mode 100644 dbms/src/Flash/Planner/PlanType.cpp create mode 100644 dbms/src/Flash/Planner/PlanType.h create mode 100644 dbms/src/Flash/Planner/Planner.cpp create mode 100644 dbms/src/Flash/Planner/Planner.h create mode 100644 dbms/src/Flash/Planner/optimize.cpp create mode 100644 dbms/src/Flash/Planner/optimize.h create mode 100644 dbms/src/Flash/Planner/plans/PhysicalLeaf.h create mode 100644 dbms/src/Flash/Planner/plans/PhysicalSource.h create mode 100644 dbms/src/Flash/Planner/plans/PhysicalUnary.h rename dbms/src/Flash/tests/{gtest_interpreter.cpp => gtest_planner_interpreter.cpp} (59%) create mode 100644 dbms/src/Flash/tests/gtest_qb_interpreter.cpp diff --git a/.github/workflows/license-checker.yml b/.github/workflows/license-checker.yml index e156c1b2b4c..c4c510677b1 100644 --- a/.github/workflows/license-checker.yml +++ b/.github/workflows/license-checker.yml @@ -3,10 +3,10 @@ name: License checker on: push: branches: - - master + - planner_refactory pull_request: branches: - - master + - planner_refactory jobs: check-license: diff --git a/dbms/src/Common/TiFlashException.h b/dbms/src/Common/TiFlashException.h index 3b4e3d75813..b00fdd3c7c9 100644 --- a/dbms/src/Common/TiFlashException.h +++ b/dbms/src/Common/TiFlashException.h @@ -96,6 +96,23 @@ namespace DB "This error usually occurs when the TiFlash server is busy or the TiFlash node is down.\n", \ ""); \ ) \ + C(Planner, \ + E(BadRequest, "Bad TiDB DAGRequest.", \ + "This error is usually caused by incorrect TiDB DAGRequest. \n" \ + "Please contact with developer, \n" \ + "better providing information about your cluster(log, topology information etc.).", \ + ""); \ + E(Unimplemented, "Some features are unimplemented.", \ + "This error may caused by unmatched TiDB and TiFlash versions, \n" \ + "and should not occur in common case. \n" \ + "Please contact with developer, \n" \ + "better providing information about your cluster(log, topology information etc.).", \ + ""); \ + E(Internal, "TiFlash Planner internal error.", \ + "Please contact with developer, \n" \ + "better providing information about your cluster(log, topology information etc.).", \ + ""); \ + ) \ C(Table, \ E(SchemaVersionError, "Schema version of target table in TiFlash is different from that in query.", \ "TiFlash will sync the newest schema from TiDB before processing every query. \n" \ diff --git a/dbms/src/Flash/CMakeLists.txt b/dbms/src/Flash/CMakeLists.txt index b32202fa6c5..f250029b333 100644 --- a/dbms/src/Flash/CMakeLists.txt +++ b/dbms/src/Flash/CMakeLists.txt @@ -17,6 +17,8 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake) add_headers_and_sources(flash_service .) add_headers_and_sources(flash_service ./Coprocessor) add_headers_and_sources(flash_service ./Mpp) +add_headers_and_sources(flash_service ./Planner) +add_headers_and_sources(flash_service ./Planner/plans) add_headers_and_sources(flash_service ./Statistics) add_headers_and_sources(flash_service ./Management) diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index a67ebf20aa5..570271ec93b 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include namespace DB @@ -65,12 +66,24 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block) BlockInputStreams child_streams = executeQueryBlock(*child); input_streams_vec.push_back(child_streams); } - DAGQueryBlockInterpreter query_block_interpreter( - context, - input_streams_vec, - query_block, - max_streams); - return query_block_interpreter.execute(); + if (context.getSettingsRef().enable_planner && Planner::isSupported(query_block)) + { + Planner planner( + context, + input_streams_vec, + query_block, + max_streams); + return planner.execute(); + } + else + { + DAGQueryBlockInterpreter query_block_interpreter( + context, + input_streams_vec, + query_block, + max_streams); + return query_block_interpreter.execute(); + } } BlockIO InterpreterDAG::execute() diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp new file mode 100644 index 00000000000..73fcb839ae1 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -0,0 +1,73 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlan::PhysicalPlan( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id) + : executor_id(executor_id_) + , type(type_) + , schema(schema_) + , log(Logger::get(type_.toString(), req_id)) +{} + +String PhysicalPlan::toString() +{ + auto schema_to_string = [&]() { + FmtBuffer buffer; + buffer.joinStr( + schema.cbegin(), + schema.cend(), + [](const auto & item, FmtBuffer & buf) { buf.fmtAppend("<{}, {}>", item.name, item.type->getName()); }, + ", "); + return buffer.toString(); + }; + return fmt::format( + "type: {}, executor_id: {}, is_record_profile_streams: {}, schema: {}", + type.toString(), + executor_id, + is_record_profile_streams, + schema_to_string()); +} + +void PhysicalPlan::finalize() +{ + finalize(PhysicalPlanHelper::schemaToNames(schema)); +} + +void PhysicalPlan::recordProfileStreams(DAGPipeline & pipeline, const Context & context) +{ + if (is_record_profile_streams) + { + auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id]; + pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); + } +} + +void PhysicalPlan::transform(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + transformImpl(pipeline, context, max_streams); + recordProfileStreams(pipeline, context); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlan.h b/dbms/src/Flash/Planner/PhysicalPlan.h new file mode 100644 index 00000000000..8a69545f10b --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlan.h @@ -0,0 +1,83 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +struct DAGPipeline; +class Context; +class DAGContext; + +class PhysicalPlan; +using PhysicalPlanPtr = std::shared_ptr; + +class PhysicalPlan +{ +public: + PhysicalPlan( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id); + + virtual ~PhysicalPlan() = default; + + virtual PhysicalPlanPtr children(size_t /*i*/) const = 0; + + virtual void setChild(size_t /*i*/, const PhysicalPlanPtr & /*new_child*/) = 0; + + const PlanType & tp() const { return type; } + + const String & execId() const { return executor_id; } + + const NamesAndTypes & getSchema() const { return schema; } + + virtual void appendChild(const PhysicalPlanPtr & /*new_child*/) = 0; + + virtual size_t childrenSize() const = 0; + + virtual void transform(DAGPipeline & pipeline, Context & context, size_t max_streams); + + virtual void finalize(const Names & parent_require) = 0; + void finalize(); + + /// Obtain a sample block that contains the names and types of result columns. + virtual const Block & getSampleBlock() const = 0; + + void disableRecordProfileStreams() { is_record_profile_streams = false; } + + String toString(); + +protected: + virtual void transformImpl(DAGPipeline & /*pipeline*/, Context & /*context*/, size_t /*max_streams*/){}; + + void recordProfileStreams(DAGPipeline & pipeline, const Context & context); + + String executor_id; + PlanType type; + NamesAndTypes schema; + bool is_record_profile_streams = true; + + LoggerPtr log; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp new file mode 100644 index 00000000000..b4037746ae5 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +void PhysicalPlanBuilder::buildSource(const Block & sample_block) +{ + cur_plans.push_back(PhysicalSource::build(sample_block, log->identifier())); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.h b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h new file mode 100644 index 00000000000..bc97d84f5b3 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h @@ -0,0 +1,47 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +class PhysicalPlanBuilder +{ +public: + explicit PhysicalPlanBuilder(Context & context_, const String & req_id) + : context(context_) + , log(Logger::get("PhysicalPlanBuilder", req_id)) + {} + + void buildSource(const Block & sample_block); + + PhysicalPlanPtr getResult() const + { + RUNTIME_ASSERT(cur_plans.size() == 1, log, "There can only be one plan output, but here are {}", cur_plans.size()); + return cur_plans.back(); + } + +private: + std::vector cur_plans; + + [[maybe_unused]] Context & context; + + LoggerPtr log; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp new file mode 100644 index 00000000000..456ea70101e --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp @@ -0,0 +1,27 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::PhysicalPlanHelper +{ +Names schemaToNames(const NamesAndTypes & schema) +{ + Names names; + names.reserve(schema.size()); + for (const auto & column : schema) + names.push_back(column.name); + return names; +} +} // namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.h b/dbms/src/Flash/Planner/PhysicalPlanHelper.h new file mode 100644 index 00000000000..8a39921ec51 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.h @@ -0,0 +1,22 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::PhysicalPlanHelper +{ +Names schemaToNames(const NamesAndTypes & schema); +} // namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/PlanType.cpp b/dbms/src/Flash/Planner/PlanType.cpp new file mode 100644 index 00000000000..131a9c13b3a --- /dev/null +++ b/dbms/src/Flash/Planner/PlanType.cpp @@ -0,0 +1,30 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +String PlanType::toString() const +{ + switch (enum_value) + { + case Source: + return "Source"; + default: + throw TiFlashException("Unknown PlanType", Errors::Planner::Internal); + } +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h new file mode 100644 index 00000000000..9a5f26a497b --- /dev/null +++ b/dbms/src/Flash/Planner/PlanType.h @@ -0,0 +1,46 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +struct PlanType +{ + enum PlanTypeEnum + { + Source = 0, + }; + PlanTypeEnum enum_value; + + PlanType(int value = 0) // NOLINT(google-explicit-constructor) + : enum_value(static_cast(value)) + {} + + PlanType & operator=(int value) + { + this->enum_value = static_cast(value); + return *this; + } + + operator int() const // NOLINT(google-explicit-constructor) + { + return this->enum_value; + } + + String toString() const; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/Planner.cpp b/dbms/src/Flash/Planner/Planner.cpp new file mode 100644 index 00000000000..3ccfc1234d3 --- /dev/null +++ b/dbms/src/Flash/Planner/Planner.cpp @@ -0,0 +1,78 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +Planner::Planner( + Context & context_, + const std::vector & input_streams_vec_, + const DAGQueryBlock & query_block_, + size_t max_streams_) + : context(context_) + , input_streams_vec(input_streams_vec_) + , query_block(query_block_) + , max_streams(max_streams_) + , log(Logger::get("Planner", dagContext().log ? dagContext().log->identifier() : "")) +{} + +BlockInputStreams Planner::execute() +{ + DAGPipeline pipeline; + executeImpl(pipeline); + if (!pipeline.streams_with_non_joined_data.empty()) + { + executeUnion(pipeline, max_streams, log); + restorePipelineConcurrency(pipeline); + } + return pipeline.streams; +} + +bool Planner::isSupported(const DAGQueryBlock &) +{ + return false; +} + +DAGContext & Planner::dagContext() const +{ + return *context.getDAGContext(); +} + +void Planner::restorePipelineConcurrency(DAGPipeline & pipeline) +{ + if (query_block.can_restore_pipeline_concurrency) + restoreConcurrency(pipeline, dagContext().final_concurrency, log); +} + +void Planner::executeImpl(DAGPipeline & pipeline) +{ + PhysicalPlanBuilder builder{context, log->identifier()}; + for (const auto & input_streams : input_streams_vec) + { + RUNTIME_ASSERT(!input_streams.empty(), log, "input streams cannot be empty"); + builder.buildSource(input_streams.back()->getHeader()); + } + + auto physical_plan = builder.getResult(); + physical_plan = optimize(context, physical_plan); + physical_plan->transform(pipeline, context, max_streams); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/Planner.h b/dbms/src/Flash/Planner/Planner.h new file mode 100644 index 00000000000..482d9cc5d76 --- /dev/null +++ b/dbms/src/Flash/Planner/Planner.h @@ -0,0 +1,60 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class Context; +class DAGContext; + +class Planner +{ +public: + Planner( + Context & context_, + const std::vector & input_streams_vec_, + const DAGQueryBlock & query_block_, + size_t max_streams_); + + ~Planner() = default; + + BlockInputStreams execute(); + + static bool isSupported(const DAGQueryBlock & query_block); + +private: + DAGContext & dagContext() const; + + void restorePipelineConcurrency(DAGPipeline & pipeline); + + void executeImpl(DAGPipeline & pipeline); + +private: + Context & context; + + std::vector input_streams_vec; + + const DAGQueryBlock & query_block; + + /// Max streams we will do processing. + size_t max_streams = 1; + + LoggerPtr log; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/optimize.cpp b/dbms/src/Flash/Planner/optimize.cpp new file mode 100644 index 00000000000..244ddd534b6 --- /dev/null +++ b/dbms/src/Flash/Planner/optimize.cpp @@ -0,0 +1,49 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +class Rule +{ +public: + virtual PhysicalPlanPtr apply(const Context & context, PhysicalPlanPtr plan) = 0; + + virtual ~Rule() = default; +}; +using RulePtr = std::shared_ptr; + +class FinalizeRule : public Rule +{ +public: + PhysicalPlanPtr apply(const Context &, PhysicalPlanPtr plan) override + { + plan->finalize(); + return plan; + } + + static RulePtr create() { return std::make_shared(); } +}; + +PhysicalPlanPtr optimize(const Context & context, PhysicalPlanPtr plan) +{ + assert(plan); + static std::vector rules{FinalizeRule::create()}; + for (const auto & rule : rules) + plan = rule->apply(context, plan); + return plan; +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/optimize.h b/dbms/src/Flash/Planner/optimize.h new file mode 100644 index 00000000000..8ba738c9f77 --- /dev/null +++ b/dbms/src/Flash/Planner/optimize.h @@ -0,0 +1,23 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +class Context; +PhysicalPlanPtr optimize(const Context & context, PhysicalPlanPtr plan); +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalLeaf.h b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h new file mode 100644 index 00000000000..50ced412c13 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +/** + * A physical plan node with no children. + */ +class PhysicalLeaf : public PhysicalPlan +{ +public: + PhysicalLeaf( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id) + : PhysicalPlan(executor_id_, type_, schema_, req_id) + {} + + PhysicalPlanPtr children(size_t) const override + { + throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); + } + + void setChild(size_t, const PhysicalPlanPtr &) override + { + throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); + } + + void appendChild(const PhysicalPlanPtr &) override + { + throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); + } + + size_t childrenSize() const override { return 0; }; +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Planner/plans/PhysicalSource.h b/dbms/src/Flash/Planner/plans/PhysicalSource.h new file mode 100644 index 00000000000..6b6837de107 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalSource.h @@ -0,0 +1,55 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class PhysicalSource : public PhysicalLeaf +{ +public: + static PhysicalPlanPtr build( + const Block & sample_block, + const String & req_id) + { + NamesAndTypes schema; + for (const auto & col : sample_block) + schema.emplace_back(col.name, col.type); + return std::make_shared("source", schema, sample_block, req_id); + } + + PhysicalSource( + const String & executor_id_, + const NamesAndTypes & schema_, + const Block & sample_block_, + const String & req_id) + : PhysicalLeaf(executor_id_, PlanType::Source, schema_, req_id) + , sample_block(sample_block_) + { + is_record_profile_streams = false; + } + + void transformImpl(DAGPipeline &, Context &, size_t) override {} + + void finalize(const Names &) override {} + + const Block & getSampleBlock() const override { return sample_block; } + +private: + Block sample_block; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalUnary.h b/dbms/src/Flash/Planner/plans/PhysicalUnary.h new file mode 100644 index 00000000000..4d0091bb8e3 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalUnary.h @@ -0,0 +1,66 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +/** + * A physical plan node with single child. + */ +class PhysicalUnary : public PhysicalPlan +{ +public: + PhysicalUnary( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id) + : PhysicalPlan(executor_id_, type_, schema_, req_id) + {} + + PhysicalPlanPtr children(size_t i) const override + { + RUNTIME_ASSERT(i == 0, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); + assert(child); + return child; + } + + void setChild(size_t i, const PhysicalPlanPtr & new_child) override + { + RUNTIME_ASSERT(i == 0, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); + assert(new_child); + assert(new_child.get() != this); + child = new_child; + } + + void appendChild(const PhysicalPlanPtr & new_child) override + { + RUNTIME_ASSERT(!child, log, "the actual children size had be the max size({}), don't append child again", childrenSize()); + assert(new_child); + assert(new_child.get() != this); + child = new_child; + } + + size_t childrenSize() const override { return 1; }; + +protected: + PhysicalPlanPtr child; +}; +} // namespace DB diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp similarity index 59% rename from dbms/src/Flash/tests/gtest_interpreter.cpp rename to dbms/src/Flash/tests/gtest_planner_interpreter.cpp index a6bb8ff1702..acb5ae0d2c9 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -19,13 +19,15 @@ namespace DB { namespace tests { -class InterpreterExecuteTest : public DB::tests::ExecutorTest +class PlannerInterpreterExecuteTest : public DB::tests::ExecutorTest { public: void initializeContext() override { ExecutorTest::initializeContext(); + context.context.setSetting("enable_planner", "true"); + context.addMockTable({"test_db", "test_table"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "test_table_1"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "r_table"}, {{"r_a", TiDB::TP::TypeLong}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); @@ -36,7 +38,7 @@ class InterpreterExecuteTest : public DB::tests::ExecutorTest } }; -TEST_F(InterpreterExecuteTest, SingleQueryBlock) +TEST_F(PlannerInterpreterExecuteTest, SimpleQuery) try { auto request = context.scan("test_db", "test_table_1") @@ -90,7 +92,7 @@ Union: } CATCH -TEST_F(InterpreterExecuteTest, MultipleQueryBlockWithSource) +TEST_F(PlannerInterpreterExecuteTest, ComplexQuery) try { auto request = context.scan("test_db", "test_table_1") @@ -385,5 +387,323 @@ CreatingSets } CATCH +TEST_F(PlannerInterpreterExecuteTest, ParallelQuery) +try +{ + /// executor with table scan + auto request = context.scan("test_db", "test_table_1") + .limit(10) + .build(context); + { + String expected = R"( +Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + SharedQuery x 5: + Limit, limit = 10 + Union: + Limit x 5, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .build(context); + { + String expected = R"( +Expression: + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + SharedQuery: + ParallelAggregating, max_threads: 5, final: true + Expression x 5: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .build(context); + { + String expected = R"( +Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + SharedQuery x 5: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 5: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .filter(eq(col("s2"), col("s3"))) + .build(context); + { + String expected = R"( +Expression: + Expression: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + Expression: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + /// other cases + request = context.scan("test_db", "test_table_1") + .limit(10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"(Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .project({"s2", "s3"}) + .aggregation({Max(col("s2"))}, {col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .limit(10) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + request = table1.join(table2.limit(1), {col("join_c")}, ASTTableJoin::Kind::Left).build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + SharedQuery: + Limit, limit = 1 + Union: + Limit x 10, limit = 1 + Expression: + MockTableScan + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + } // namespace tests } // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/tests/gtest_qb_interpreter.cpp b/dbms/src/Flash/tests/gtest_qb_interpreter.cpp new file mode 100644 index 00000000000..c8ac422fdb3 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_qb_interpreter.cpp @@ -0,0 +1,709 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +namespace tests +{ +class QBInterpreterExecuteTest : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + + context.context.setSetting("enable_planner", "false"); + + context.addMockTable({"test_db", "test_table"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}); + context.addMockTable({"test_db", "test_table_1"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); + context.addMockTable({"test_db", "r_table"}, {{"r_a", TiDB::TP::TypeLong}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + context.addMockTable({"test_db", "l_table"}, {{"l_a", TiDB::TP::TypeLong}, {"l_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + context.addExchangeRelationSchema("sender_1", {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); + context.addExchangeRelationSchema("sender_l", {{"l_a", TiDB::TP::TypeString}, {"l_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + context.addExchangeRelationSchema("sender_r", {{"r_a", TiDB::TP::TypeString}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + } +}; + +TEST_F(QBInterpreterExecuteTest, SingleQueryBlock) +try +{ + auto request = context.scan("test_db", "test_table_1") + .filter(eq(col("s2"), col("s3"))) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .filter(eq(col("s2"), col("s3"))) + .topN("s2", false, 10) + .build(context); + { + String expected = R"( +Union: + SharedQuery x 10: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Filter: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.scan("test_db", "test_table_1") + .filter(eq(col("s2"), col("s3"))) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .filter(eq(col("s2"), col("s3"))) + .limit(10) + .build(context); + + { + String expected = R"( +Union: + SharedQuery x 10: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + Expression: + Filter: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +TEST_F(QBInterpreterExecuteTest, MultipleQueryBlockWithSource) +try +{ + auto request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .project({"s1", "s2"}) + .project("s1") + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .topN({{"s1", true}, {"s2", false}}, 10) + .project({"s1", "s2"}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .topN({{"s1", true}, {"s2", false}}, 10) + .project({"s1", "s2"}) + .aggregation({Max(col("s1"))}, {col("s1"), col("s2")}) + .project({"max(s1)", "s1", "s2"}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .topN({{"s1", true}, {"s2", false}}, 10) + .project({"s1", "s2"}) + .aggregation({Max(col("s1"))}, {col("s1"), col("s2")}) + .project({"max(s1)", "s1", "s2"}) + .filter(eq(col("s1"), col("s2"))) + .project({"max(s1)", "s1"}) + .limit(10) + .build(context); + { + String expected = R"( +Union: + SharedQuery x 10: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + Expression: + Expression: + Expression: + Expression: + Filter: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + // Join Source. + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + DAGRequestBuilder table3 = context.scan("test_db", "r_table"); + DAGRequestBuilder table4 = context.scan("test_db", "l_table"); + + request = table1.join( + table2.join( + table3.join(table4, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockTableScan + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockTableScan + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.receive("sender_1") + .project({"s1", "s2", "s3"}) + .project({"s1", "s2"}) + .project("s1") + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.receive("sender_1") + .project({"s1", "s2", "s3"}) + .project({"s1", "s2"}) + .project("s1") + .exchangeSender(tipb::Broadcast) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + // only join + ExchangeReceiver + DAGRequestBuilder receiver1 = context.receive("sender_l"); + DAGRequestBuilder receiver2 = context.receive("sender_r"); + DAGRequestBuilder receiver3 = context.receive("sender_l"); + DAGRequestBuilder receiver4 = context.receive("sender_r"); + + request = receiver1.join( + receiver2.join( + receiver3.join(receiver4, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockExchangeReceiver + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + // join + receiver + sender + // TODO: Find a way to write the request easier. + DAGRequestBuilder receiver5 = context.receive("sender_l"); + DAGRequestBuilder receiver6 = context.receive("sender_r"); + DAGRequestBuilder receiver7 = context.receive("sender_l"); + DAGRequestBuilder receiver8 = context.receive("sender_r"); + request = receiver5.join( + receiver6.join( + receiver7.join(receiver8, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockExchangeReceiver + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver + Union: + MockExchangeSender x 10 + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +TEST_F(QBInterpreterExecuteTest, ParallelQuery) +try +{ + /// executor with table scan + auto request = context.scan("test_db", "test_table_1") + .limit(10) + .build(context); + { + String expected = R"( +Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + SharedQuery x 5: + Limit, limit = 10 + Union: + Limit x 5, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .build(context); + { + String expected = R"( +Expression: + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + SharedQuery: + ParallelAggregating, max_threads: 5, final: true + Expression x 5: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .build(context); + { + String expected = R"( +Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + SharedQuery x 5: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 5: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .filter(eq(col("s2"), col("s3"))) + .build(context); + { + String expected = R"( +Expression: + Expression: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + Expression: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + /// other cases + request = context.scan("test_db", "test_table_1") + .limit(10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"(Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .project({"s2", "s3"}) + .aggregation({Max(col("s2"))}, {col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .limit(10) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + request = table1.join(table2.limit(1), {col("join_c")}, ASTTableJoin::Kind::Left).build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + SharedQuery: + Limit, limit = 1 + Union: + Limit x 10, limit = 1 + Expression: + MockTableScan + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +} // namespace tests +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 9361e0525d2..b827cb8b6a0 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -209,7 +209,7 @@ struct Settings * Basically, limits are checked for each block (not every row). That is, the limits can be slightly violated. \ * Almost all limits apply only to SELECTs. \ * Almost all limits apply to each stream individually. \ - */ \ + */ \ \ M(SettingUInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it " \ "is only checked on a remote server.") \ @@ -355,17 +355,16 @@ struct Settings M(SettingUInt64, elastic_threadpool_init_cap, 400, "The size of elastic thread pool.") \ M(SettingUInt64, elastic_threadpool_shrink_period_ms, 300000, "The shrink period(ms) of elastic thread pool.") \ M(SettingBool, enable_local_tunnel, true, "Enable local data transfer between local MPP tasks.") \ - M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ - M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.")\ + M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ + M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.") \ M(SettingBool, enable_async_server, true, "Enable async rpc server.") \ M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \ M(SettingUInt64, async_cqs, 1, "grpc async cqs") \ M(SettingUInt64, preallocated_request_count_per_poller, 20, "grpc preallocated_request_count_per_poller") \ - \ M(SettingUInt64, manual_compact_pool_size, 1, "The number of worker threads to handle manual compact requests.") \ - M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \ - M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") - + M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \ + M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") \ + M(SettingBool, enable_planner, true, "Enable planner") // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT};