Skip to content

Commit

Permalink
planner: move base logical plan to logical operator pkg. (#53293)
Browse files Browse the repository at this point in the history
ref #51664, ref #52714
  • Loading branch information
AilinKid authored May 27, 2024
1 parent 2300ac6 commit 57d0b40
Show file tree
Hide file tree
Showing 44 changed files with 823 additions and 634 deletions.
2 changes: 2 additions & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ go_library(
"//pkg/planner/core/cost",
"//pkg/planner/core/metrics",
"//pkg/planner/core/operator/baseimpl",
"//pkg/planner/core/operator/logicalop",
"//pkg/planner/funcdep",
"//pkg/planner/property",
"//pkg/planner/util",
Expand Down Expand Up @@ -260,6 +261,7 @@ go_test(
"//pkg/parser/terror",
"//pkg/planner",
"//pkg/planner/core/base",
"//pkg/planner/core/operator/logicalop",
"//pkg/planner/property",
"//pkg/planner/util",
"//pkg/planner/util/coretestsdk",
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/base/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "base",
srcs = [
"doc.go",
"misc_base.go",
"plan_base.go",
"task_base.go",
Expand Down
33 changes: 33 additions & 0 deletions pkg/planner/core/base/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package base

// read me if you want change the content of base interface definition.
// several things you should think twice before you add new method in.
//
// 1: interface should be simplified for abstract logic for most implementors
// if your method is only inherited by few follower, do not use interface.
//
// 2: interface method declared here, meaning additional implementation logic
// should be added in where the inheritors are declared. (pointer receiver
// func can only be declared in where the inheritor defined: same pkg)
//
// 3: interface definition should cover the abstract logic, do not depend on
// concrete implementor type, or relay on other core pkg handling logic.
// otherwise, importing cycle occurs, think about abstraction again.
//
// 4: if additional interface method is decided to added, pls append it to
// function list with order, the later implementors reference can also be
// easy to locate.
3 changes: 3 additions & 0 deletions pkg/planner/core/base/misc_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

// Note: appending the new adding method to the last, for the convenience of easy
// locating in other implementor from other package.

// AccessObject represents what is accessed by an operator.
// It corresponds to the "access object" column in an EXPLAIN statement result.
type AccessObject interface {
Expand Down
14 changes: 10 additions & 4 deletions pkg/planner/core/base/plan_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type PlanContext = context.PlanContext
// BuildPBContext is the context for building `*tipb.Executor`.
type BuildPBContext = context.BuildPBContext

// Note: appending the new adding method to the last, for the convenience of easy
// locating in other implementor from other package.

// Plan is the description of an execution flow.
// It is created from ast.Node first, then optimized by the optimizer,
// finally used by the executor to create a Cursor which executes the statement.
Expand Down Expand Up @@ -217,9 +220,6 @@ type LogicalPlan interface {
// interface definition should depend on concrete implementation type.
PushDownTopN(topN LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) LogicalPlan

// ConvertOuterToInnerJoin converts outer joins if the unmatching rows are filtered.
ConvertOuterToInnerJoin(predicates []expression.Expression) LogicalPlan

// DeriveTopN derives an implicit TopN from a filter on row_number window function...
DeriveTopN(opt *optimizetrace.LogicalOptimizeOp) LogicalPlan

Expand Down Expand Up @@ -263,7 +263,7 @@ type LogicalPlan interface {
// MaxOneRow means whether this operator only returns max one row.
MaxOneRow() bool

// Get all the children.
// Children Get all the children.
Children() []LogicalPlan

// SetChildren sets the children for the plan.
Expand All @@ -280,4 +280,10 @@ type LogicalPlan interface {

// ExtractFD derive the FDSet from the tree bottom up.
ExtractFD() *fd.FDSet

// GetBaseLogicalPlan return the baseLogicalPlan inside each logical plan.
GetBaseLogicalPlan() LogicalPlan

// ConvertOuterToInnerJoin converts outer joins if the matching rows are filtered.
ConvertOuterToInnerJoin(predicates []expression.Expression) LogicalPlan
}
3 changes: 3 additions & 0 deletions pkg/planner/core/base/task_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package base

// Note: appending the new adding method to the last, for the convenience of easy
// locating in other implementor from other package.

// Task is a new version of `PhysicalPlanInfo`. It stores cost information for a task.
// A task may be CopTask, RootTask, MPPTaskMeta or a ParallelTask.
type Task interface {
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/core/core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
func init() {
// For code refactor init.
utilfuncp.AddSelection = addSelection
utilfuncp.FindBestTask = findBestTask
utilfuncp.HasMaxOneRowUtil = HasMaxOneRow
utilfuncp.GetTaskPlanCost = getTaskPlanCost
utilfuncp.CanPushToCopImpl = canPushToCopImpl
utilfuncp.AppendCandidate4PhysicalOptimizeOp = appendCandidate4PhysicalOptimizeOp
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan

// For mv index init.
cardinality.GetTblInfoForUsedStatsByPhysicalID = getTblInfoForUsedStatsByPhysicalID
Expand Down
84 changes: 37 additions & 47 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/cost"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/fixcontrol"
Expand Down Expand Up @@ -455,7 +456,7 @@ func (p *LogicalJoin) getHashJoin(prop *property.PhysicalProperty, innerIdx int,
chReqProps[1-innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, CTEProducerStatus: prop.CTEProducerStatus}
if prop.ExpectedCnt < p.StatsInfo().RowCount {
expCntScale := prop.ExpectedCnt / p.StatsInfo().RowCount
chReqProps[1-innerIdx].ExpectedCnt = p.children[1-innerIdx].StatsInfo().RowCount * expCntScale
chReqProps[1-innerIdx].ExpectedCnt = p.Children()[1-innerIdx].StatsInfo().RowCount * expCntScale
}
hashJoin := NewPhysicalHashJoin(p, innerIdx, useOuterToBuild, p.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), chReqProps...)
hashJoin.SetSchema(p.schema)
Expand Down Expand Up @@ -498,7 +499,7 @@ func (p *LogicalJoin) constructIndexJoin(
chReqProps[outerIdx] = &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: math.MaxFloat64, SortItems: prop.SortItems, CTEProducerStatus: prop.CTEProducerStatus}
if prop.ExpectedCnt < p.StatsInfo().RowCount {
expCntScale := prop.ExpectedCnt / p.StatsInfo().RowCount
chReqProps[outerIdx].ExpectedCnt = p.children[outerIdx].StatsInfo().RowCount * expCntScale
chReqProps[outerIdx].ExpectedCnt = p.Children()[outerIdx].StatsInfo().RowCount * expCntScale
}
newInnerKeys := make([]*expression.Column, 0, len(innerJoinKeys))
newOuterKeys := make([]*expression.Column, 0, len(outerJoinKeys))
Expand Down Expand Up @@ -714,7 +715,7 @@ func (p *LogicalJoin) constructIndexHashJoin(
// Then, we will extract the join keys of p's equal conditions. Then check whether all of them are just the primary key
// or match some part of on index. If so we will choose the best one and construct a index join.
func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, outerIdx int) (joins []base.PhysicalPlan) {
outerChild, innerChild := p.children[outerIdx], p.children[1-outerIdx]
outerChild, innerChild := p.Children()[outerIdx], p.Children()[1-outerIdx]
all, _ := prop.AllSameOrder()
// If the order by columns are not all from outer child, index join cannot promise the order.
if !prop.AllColsFromSchema(outerChild.Schema()) || !all {
Expand Down Expand Up @@ -2318,17 +2319,17 @@ func calcHashExchangeSizeByChild(p1 base.Plan, p2 base.Plan, mppStoreCnt int) (r
// Set a scale factor (`mppStoreCnt^*`) when estimating broadcast join in `isJoinFitMPPBCJ` and `isJoinChildFitMPPBCJ` (based on TPCH benchmark, it has been verified in Q9).

func isJoinFitMPPBCJ(p *LogicalJoin, mppStoreCnt int) bool {
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSizeByChild(p.Children()[0], p.Children()[1], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.Children()[0], p.Children()[1], mppStoreCnt)
if hasSizeBC && hasSizeHash {
return szBC*float64(mppStoreCnt) <= szHash
}
return rowBC*float64(mppStoreCnt) <= rowHash
}

func isJoinChildFitMPPBCJ(p *LogicalJoin, childIndexToBC int, mppStoreCnt int) bool {
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSize(p.children[childIndexToBC], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSize(p.Children()[childIndexToBC], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.Children()[0], p.Children()[1], mppStoreCnt)

if hasSizeBC && hasSizeHash {
return szBC*float64(mppStoreCnt) <= szHash
Expand Down Expand Up @@ -2368,11 +2369,11 @@ func (p *LogicalJoin) preferMppBCJ() bool {
}

if onlyCheckChild1 {
return checkChildFitBC(p.children[1])
return checkChildFitBC(p.Children()[1])
} else if onlyCheckChild0 {
return checkChildFitBC(p.children[0])
return checkChildFitBC(p.Children()[0])
}
return checkChildFitBC(p.children[0]) || checkChildFitBC(p.children[1])
return checkChildFitBC(p.Children()[0]) || checkChildFitBC(p.Children()[1])
}

// ExhaustPhysicalPlans implements LogicalPlan interface
Expand Down Expand Up @@ -2445,7 +2446,7 @@ func (p *LogicalJoin) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]b

if !p.isNAAJ() {
// naaj refuse merge join and index join.
mergeJoins := p.GetMergeJoin(prop, p.schema, p.StatsInfo(), p.children[0].StatsInfo(), p.children[1].StatsInfo())
mergeJoins := p.GetMergeJoin(prop, p.schema, p.StatsInfo(), p.Children()[0].StatsInfo(), p.Children()[1].StatsInfo())
if (p.preferJoinType&h.PreferMergeJoin) > 0 && len(mergeJoins) > 0 {
return mergeJoins, true, nil
}
Expand Down Expand Up @@ -2566,7 +2567,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
preferredBuildIndex := 0
fixedBuildSide := false // Used to indicate whether the build side for the MPP join is fixed or not.
if p.JoinType == InnerJoin {
if p.children[0].StatsInfo().Count() > p.children[1].StatsInfo().Count() {
if p.Children()[0].StatsInfo().Count() > p.Children()[1].StatsInfo().Count() {
preferredBuildIndex = 1
}
} else if p.JoinType.IsSemiJoin() {
Expand All @@ -2575,7 +2576,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
preferredBuildIndex = 1
// MPPOuterJoinFixedBuildSide default value is false
// use MPPOuterJoinFixedBuildSide here as a way to disable using left table as build side
if !p.SCtx().GetSessionVars().MPPOuterJoinFixedBuildSide && p.children[1].StatsInfo().Count() > p.children[0].StatsInfo().Count() {
if !p.SCtx().GetSessionVars().MPPOuterJoinFixedBuildSide && p.Children()[1].StatsInfo().Count() > p.Children()[0].StatsInfo().Count() {
preferredBuildIndex = 0
}
} else {
Expand All @@ -2597,7 +2598,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
if p.JoinType == LeftOuterJoin {
preferredBuildIndex = 1
}
} else if p.children[0].StatsInfo().Count() > p.children[1].StatsInfo().Count() {
} else if p.Children()[0].StatsInfo().Count() > p.Children()[1].StatsInfo().Count() {
preferredBuildIndex = 1
}
}
Expand Down Expand Up @@ -2630,7 +2631,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
expCnt := math.MaxFloat64
if prop.ExpectedCnt < p.StatsInfo().RowCount {
expCntScale := prop.ExpectedCnt / p.StatsInfo().RowCount
expCnt = p.children[1-preferredBuildIndex].StatsInfo().RowCount * expCntScale
expCnt = p.Children()[1-preferredBuildIndex].StatsInfo().RowCount * expCntScale
}
if prop.MPPPartitionTp == property.HashType {
lPartitionKeys, rPartitionKeys := p.GetPotentialPartitionKeys()
Expand Down Expand Up @@ -2887,7 +2888,7 @@ func (la *LogicalApply) GetHashJoin(prop *property.PhysicalProperty) *PhysicalHa

// ExhaustPhysicalPlans implements LogicalPlan interface.
func (la *LogicalApply) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
if !prop.AllColsFromSchema(la.children[0].Schema()) || prop.IsFlashProp() { // for convenient, we don't pass through any prop
if !prop.AllColsFromSchema(la.Children()[0].Schema()) || prop.IsFlashProp() { // for convenient, we don't pass through any prop
la.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"MPP mode may be blocked because operator `Apply` is not supported now.")
return nil, true, nil
Expand All @@ -2896,7 +2897,7 @@ func (la *LogicalApply) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([
la.SCtx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("Parallel Apply rejects the possible order properties of its outer child currently"))
return nil, true, nil
}
disableAggPushDownToCop(la.children[0])
disableAggPushDownToCop(la.Children()[0])
join := la.GetHashJoin(prop)
var columns = make([]*expression.Column, 0, len(la.CorCols))
for _, colColumn := range la.CorCols {
Expand Down Expand Up @@ -3097,21 +3098,10 @@ func (lw *LogicalWindow) ExhaustPhysicalPlans(prop *property.PhysicalProperty) (
return windows, true, nil
}

// ExhaustPhysicalPlans is only for implementing interface. DataSource and Dual generate task in `findBestTask` directly.
func (*baseLogicalPlan) ExhaustPhysicalPlans(*property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
panic("baseLogicalPlan.ExhaustPhysicalPlans() should never be called.")
}

// CanPushToCop checks if it can be pushed to some stores. For TiKV, it only checks datasource.
// For TiFlash, it will check whether the operator is supported, but note that the check might be inaccrate.
func (p *baseLogicalPlan) CanPushToCop(storeTp kv.StoreType) bool {
return canPushToCopImpl(p, storeTp, false)
}

// todo: move canPushToCopImpl to func_pointer_misc when move baseLogicalPlan out of core.
func canPushToCopImpl(p *baseLogicalPlan, storeTp kv.StoreType, considerDual bool) bool {
func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool {
p := lp.GetBaseLogicalPlan().(*logicalop.BaseLogicalPlan)
ret := true
for _, ch := range p.children {
for _, ch := range p.Children() {
switch c := ch.(type) {
case *DataSource:
validDs := false
Expand All @@ -3126,8 +3116,8 @@ func canPushToCopImpl(p *baseLogicalPlan, storeTp kv.StoreType, considerDual boo
}
ret = ret && validDs

_, isTopN := p.self.(*LogicalTopN)
_, isLimit := p.self.(*LogicalLimit)
_, isTopN := p.Self().(*LogicalTopN)
_, isLimit := p.Self().(*LogicalLimit)
if (isTopN || isLimit) && indexMergeIsIntersection {
return false // TopN and Limit cannot be pushed down to the intersection type IndexMerge
}
Expand All @@ -3143,23 +3133,23 @@ func canPushToCopImpl(p *baseLogicalPlan, storeTp kv.StoreType, considerDual boo
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.baseLogicalPlan, storeTp, true)
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *LogicalSort:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.baseLogicalPlan, storeTp, true)
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *LogicalProjection:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.baseLogicalPlan, storeTp, considerDual)
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *LogicalExpand:
// Expand itself only contains simple col ref and literal projection. (always ok, check its child)
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.baseLogicalPlan, storeTp, considerDual)
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *LogicalTableDual:
return storeTp == kv.TiFlash && considerDual
case *LogicalAggregation, *LogicalSelection, *LogicalJoin, *LogicalWindow:
Expand Down Expand Up @@ -3191,7 +3181,7 @@ func canPushToCopImpl(p *baseLogicalPlan, storeTp kv.StoreType, considerDual boo

// CanPushToCop implements LogicalPlan interface.
func (la *LogicalAggregation) CanPushToCop(storeTp kv.StoreType) bool {
return la.baseLogicalPlan.CanPushToCop(storeTp) && !la.noCopPushDown
return la.BaseLogicalPlan.CanPushToCop(storeTp) && !la.noCopPushDown
}

func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []base.PhysicalPlan {
Expand Down Expand Up @@ -3691,9 +3681,9 @@ func (p *LogicalUnionAll) ExhaustPhysicalPlans(prop *property.PhysicalProperty)
if prop.TaskTp == property.MppTaskType && prop.MPPPartitionTp != property.AnyType {
return nil, true, nil
}
canUseMpp := p.SCtx().GetSessionVars().IsMPPAllowed() && canPushToCopImpl(&p.baseLogicalPlan, kv.TiFlash, true)
chReqProps := make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
canUseMpp := p.SCtx().GetSessionVars().IsMPPAllowed() && canPushToCopImpl(&p.BaseLogicalPlan, kv.TiFlash, true)
chReqProps := make([]*property.PhysicalProperty, 0, p.ChildLen())
for range p.Children() {
if canUseMpp && prop.TaskTp == property.MppTaskType {
chReqProps = append(chReqProps, &property.PhysicalProperty{
ExpectedCnt: prop.ExpectedCnt,
Expand All @@ -3710,8 +3700,8 @@ func (p *LogicalUnionAll) ExhaustPhysicalPlans(prop *property.PhysicalProperty)
}.Init(p.SCtx(), p.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), p.QueryBlockOffset(), chReqProps...)
ua.SetSchema(p.Schema())
if canUseMpp && prop.TaskTp == property.RootTaskType {
chReqProps = make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
chReqProps = make([]*property.PhysicalProperty, 0, p.ChildLen())
for range p.Children() {
chReqProps = append(chReqProps, &property.PhysicalProperty{
ExpectedCnt: prop.ExpectedCnt,
TaskTp: property.MppTaskType,
Expand Down Expand Up @@ -3768,7 +3758,7 @@ func (ls *LogicalSort) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]
return ret, true, nil
}
} else if prop.TaskTp == property.MppTaskType && prop.RejectSort {
if canPushToCopImpl(&ls.baseLogicalPlan, kv.TiFlash, true) {
if canPushToCopImpl(&ls.BaseLogicalPlan, kv.TiFlash, true) {
newProp := prop.CloneEssentialFields()
newProp.RejectSort = true
ps := NominalSort{OnlyColumn: true, ByItems: ls.ByItems}.Init(
Expand Down Expand Up @@ -3824,13 +3814,13 @@ func (p *LogicalSequence) ExhaustPhysicalPlans(prop *property.PhysicalProperty)
}
seqs := make([]base.PhysicalPlan, 0, 2)
for _, propChoice := range possibleChildrenProps {
childReqs := make([]*property.PhysicalProperty, 0, len(p.children))
for i := 0; i < len(p.children)-1; i++ {
childReqs := make([]*property.PhysicalProperty, 0, p.ChildLen())
for i := 0; i < p.ChildLen()-1; i++ {
childReqs = append(childReqs, propChoice[0].CloneEssentialFields())
}
childReqs = append(childReqs, propChoice[1])
seq := PhysicalSequence{}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset(), childReqs...)
seq.SetSchema(p.children[len(p.children)-1].Schema())
seq.SetSchema(p.Children()[p.ChildLen()-1].Schema())
seqs = append(seqs, seq)
}
return seqs, true, nil
Expand Down
Loading

0 comments on commit 57d0b40

Please sign in to comment.