Skip to content

Commit

Permalink
planner: some code clean after all logical op are migrated (#56426)
Browse files Browse the repository at this point in the history
ref #51664, ref #52714
  • Loading branch information
AilinKid authored Oct 10, 2024
1 parent ffcfb5e commit 2864508
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 178 deletions.
1 change: 0 additions & 1 deletion pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ go_library(
"//pkg/planner/util/domainmisc",
"//pkg/planner/util/fixcontrol",
"//pkg/planner/util/optimizetrace",
"//pkg/planner/util/optimizetrace/logicaltrace",
"//pkg/planner/util/tablesampler",
"//pkg/planner/util/utilfuncp",
"//pkg/privilege",
Expand Down
6 changes: 1 addition & 5 deletions pkg/planner/core/core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ import (

func init() {
// For code refactor init.
utilfuncp.FindBestTask = findBestTask
utilfuncp.PruneByItems = pruneByItems
utilfuncp.CanPushToCopImpl = canPushToCopImpl
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan
utilfuncp.FindBestTask4BaseLogicalPlan = findBestTask
utilfuncp.FindBestTask4LogicalCTE = findBestTask4LogicalCTE
utilfuncp.FindBestTask4LogicalShow = findBestTask4LogicalShow
utilfuncp.FindBestTask4LogicalCTETable = findBestTask4LogicalCTETable
Expand Down Expand Up @@ -68,7 +65,6 @@ func init() {
utilfuncp.ApplyPredicateSimplification = applyPredicateSimplification
utilfuncp.DeriveStats4LogicalIndexScan = deriveStats4LogicalIndexScan
utilfuncp.DeriveStats4LogicalTableScan = deriveStats4LogicalTableScan
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan

// For mv index init.
cardinality.GetTblInfoForUsedStatsByPhysicalID = getTblInfoForUsedStatsByPhysicalID
Expand Down
86 changes: 2 additions & 84 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/cardinality"
Expand Down Expand Up @@ -2426,87 +2425,6 @@ func exhaustPhysicalPlans4LogicalWindow(lp base.LogicalPlan, prop *property.Phys
return windows, true, nil
}

func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool {
p := lp.GetBaseLogicalPlan().(*logicalop.BaseLogicalPlan)
ret := true
for _, ch := range p.Children() {
switch c := ch.(type) {
case *logicalop.DataSource:
validDs := false
indexMergeIsIntersection := false
for _, path := range c.PossibleAccessPaths {
if path.StoreType == storeTp {
validDs = true
}
if len(path.PartialIndexPaths) > 0 && path.IndexMergeIsIntersection {
indexMergeIsIntersection = true
}
}
ret = ret && validDs

_, isTopN := p.Self().(*logicalop.LogicalTopN)
_, isLimit := p.Self().(*logicalop.LogicalLimit)
if (isTopN || isLimit) && indexMergeIsIntersection {
return false // TopN and Limit cannot be pushed down to the intersection type IndexMerge
}

if c.TableInfo.TableCacheStatusType != model.TableCacheStatusDisable {
// Don't push to cop for cached table, it brings more harm than good:
// 1. Those tables are small enough, push to cop can't utilize several TiKV to accelerate computation.
// 2. Cached table use UnionScan to read the cache data, and push to cop is not supported when an UnionScan exists.
// Once aggregation is pushed to cop, the cache data can't be use any more.
return false
}
case *logicalop.LogicalUnionAll:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *logicalop.LogicalSort:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *logicalop.LogicalProjection:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *logicalop.LogicalExpand:
// Expand itself only contains simple col ref and literal projection. (always ok, check its child)
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *logicalop.LogicalTableDual:
return storeTp == kv.TiFlash && considerDual
case *logicalop.LogicalAggregation, *logicalop.LogicalSelection, *logicalop.LogicalJoin, *logicalop.LogicalWindow:
if storeTp != kv.TiFlash {
return false
}
ret = ret && c.CanPushToCop(storeTp)
// These operators can be partially push down to TiFlash, so we don't raise warning for them.
case *logicalop.LogicalLimit, *logicalop.LogicalTopN:
return false
case *logicalop.LogicalSequence:
return storeTp == kv.TiFlash
case *logicalop.LogicalCTE:
if storeTp != kv.TiFlash {
return false
}
if c.Cte.RecursivePartLogicalPlan != nil || !c.Cte.SeedPartLogicalPlan.CanPushToCop(storeTp) {
return false
}
return true
default:
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"MPP mode may be blocked because operator `" + c.TP() + "` is not supported now.")
return false
}
}
return ret
}

func getEnforcedStreamAggs(la *logicalop.LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan {
if prop.IsFlashProp() {
return nil
Expand Down Expand Up @@ -2972,7 +2890,7 @@ func exhaustPhysicalPlans4LogicalUnionAll(lp base.LogicalPlan, prop *property.Ph
if prop.TaskTp == property.MppTaskType && prop.MPPPartitionTp != property.AnyType {
return nil, true, nil
}
canUseMpp := p.SCtx().GetSessionVars().IsMPPAllowed() && canPushToCopImpl(&p.BaseLogicalPlan, kv.TiFlash, true)
canUseMpp := p.SCtx().GetSessionVars().IsMPPAllowed() && logicalop.CanPushToCopImpl(&p.BaseLogicalPlan, kv.TiFlash, true)
chReqProps := make([]*property.PhysicalProperty, 0, p.ChildLen())
for range p.Children() {
if canUseMpp && prop.TaskTp == property.MppTaskType {
Expand Down Expand Up @@ -3040,7 +2958,7 @@ func exhaustPhysicalPlans4LogicalSort(lp base.LogicalPlan, prop *property.Physic
return ret, true, nil
}
} else if prop.TaskTp == property.MppTaskType && prop.RejectSort {
if canPushToCopImpl(&ls.BaseLogicalPlan, kv.TiFlash, true) {
if logicalop.CanPushToCopImpl(&ls.BaseLogicalPlan, kv.TiFlash, true) {
ps := getNominalSortSimple(ls, prop)
return []base.PhysicalPlan{ps}, true, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/planner/core/operator/logicalop/base_logical_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (p *BaseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt
// FindBestTask implements LogicalPlan.<3rd> interface.
func (p *BaseLogicalPlan) FindBestTask(prop *property.PhysicalProperty, planCounter *base.PlanCounterTp,
opt *optimizetrace.PhysicalOptimizeOp) (bestTask base.Task, cntPlan int64, err error) {
return utilfuncp.FindBestTask(p, prop, planCounter, opt)
return utilfuncp.FindBestTask4BaseLogicalPlan(p, prop, planCounter, opt)
}

// BuildKeyInfo implements LogicalPlan.<4th> interface.
Expand All @@ -156,7 +156,7 @@ func (p *BaseLogicalPlan) BuildKeyInfo(_ *expression.Schema, _ []*expression.Sch

// PushDownTopN implements the LogicalPlan.<5th> interface.
func (p *BaseLogicalPlan) PushDownTopN(topNLogicalPlan base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
return utilfuncp.PushDownTopNForBaseLogicalPlan(p, topNLogicalPlan, opt)
return pushDownTopNForBaseLogicalPlan(p, topNLogicalPlan, opt)
}

// DeriveTopN implements the LogicalPlan.<6th> interface.
Expand Down Expand Up @@ -309,7 +309,7 @@ func (p *BaseLogicalPlan) RollBackTaskMap(ts uint64) {
// For TiFlash, it will check whether the operator is supported, but note that the check
// might be inaccurate.
func (p *BaseLogicalPlan) CanPushToCop(storeTp kv.StoreType) bool {
return utilfuncp.CanPushToCopImpl(p, storeTp, false)
return CanPushToCopImpl(p, storeTp, false)
}

// ExtractFD implements LogicalPlan.<22nd> interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/operator/logicalop/logical_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column,
selfUsedCols = expression.ExtractColumnsFromExpressions(selfUsedCols, aggrFunc.Args, nil)

var cols []*expression.Column
aggrFunc.OrderByItems, cols = utilfuncp.PruneByItems(la, aggrFunc.OrderByItems, opt)
aggrFunc.OrderByItems, cols = pruneByItems(la, aggrFunc.OrderByItems, opt)
selfUsedCols = append(selfUsedCols, cols...)
}
if len(la.AggFuncs) == 0 || (!allFirstRow && allRemainFirstRow) {
Expand Down
136 changes: 136 additions & 0 deletions pkg/planner/core/operator/logicalop/logical_plans_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ package logicalop

import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/constraint"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace"
)

var (
Expand Down Expand Up @@ -98,3 +103,134 @@ func addSelection(p base.LogicalPlan, child base.LogicalPlan, conditions []expre
p.Children()[chIdx] = selection
AppendAddSelectionTraceStep(p, child, selection, opt)
}

// pushDownTopNForBaseLogicalPlan can be moved when LogicalTopN has been moved to logicalop.
func pushDownTopNForBaseLogicalPlan(lp base.LogicalPlan, topNLogicalPlan base.LogicalPlan,
opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan {
s := lp.GetBaseLogicalPlan().(*BaseLogicalPlan)
var topN *LogicalTopN
if topNLogicalPlan != nil {
topN = topNLogicalPlan.(*LogicalTopN)
}
p := s.Self()
for i, child := range p.Children() {
p.Children()[i] = child.PushDownTopN(nil, opt)
}
if topN != nil {
return topN.AttachChild(p, opt)
}
return p
}

func pruneByItems(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) (byItems []*util.ByItems,
parentUsedCols []*expression.Column) {
prunedByItems := make([]*util.ByItems, 0)
byItems = make([]*util.ByItems, 0, len(old))
seen := make(map[string]struct{}, len(old))
for _, byItem := range old {
pruned := true
hash := string(byItem.Expr.HashCode())
_, hashMatch := seen[hash]
seen[hash] = struct{}{}
cols := expression.ExtractColumns(byItem.Expr)
if !hashMatch {
if len(cols) == 0 {
if !expression.IsRuntimeConstExpr(byItem.Expr) {
pruned = false
byItems = append(byItems, byItem)
}
} else if byItem.Expr.GetType(p.SCtx().GetExprCtx().GetEvalCtx()).GetType() != mysql.TypeNull {
pruned = false
parentUsedCols = append(parentUsedCols, cols...)
byItems = append(byItems, byItem)
}
}
if pruned {
prunedByItems = append(prunedByItems, byItem)
}
}
logicaltrace.AppendByItemsPruneTraceStep(p, prunedByItems, opt)
return
}

// CanPushToCopImpl checks whether the logical plan can be pushed to coprocessor.
func CanPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool {
p := lp.GetBaseLogicalPlan().(*BaseLogicalPlan)
ret := true
for _, ch := range p.Children() {
switch c := ch.(type) {
case *DataSource:
validDs := false
indexMergeIsIntersection := false
for _, path := range c.PossibleAccessPaths {
if path.StoreType == storeTp {
validDs = true
}
if len(path.PartialIndexPaths) > 0 && path.IndexMergeIsIntersection {
indexMergeIsIntersection = true
}
}
ret = ret && validDs

_, isTopN := p.Self().(*LogicalTopN)
_, isLimit := p.Self().(*LogicalLimit)
if (isTopN || isLimit) && indexMergeIsIntersection {
return false // TopN and Limit cannot be pushed down to the intersection type IndexMerge
}

if c.TableInfo.TableCacheStatusType != model.TableCacheStatusDisable {
// Don't push to cop for cached table, it brings more harm than good:
// 1. Those tables are small enough, push to cop can't utilize several TiKV to accelerate computation.
// 2. Cached table use UnionScan to read the cache data, and push to cop is not supported when an UnionScan exists.
// Once aggregation is pushed to cop, the cache data can't be use any more.
return false
}
case *LogicalUnionAll:
if storeTp != kv.TiFlash {
return false
}
ret = ret && CanPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *LogicalSort:
if storeTp != kv.TiFlash {
return false
}
ret = ret && CanPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *LogicalProjection:
if storeTp != kv.TiFlash {
return false
}
ret = ret && CanPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *LogicalExpand:
// Expand itself only contains simple col ref and literal projection. (always ok, check its child)
if storeTp != kv.TiFlash {
return false
}
ret = ret && CanPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *LogicalTableDual:
return storeTp == kv.TiFlash && considerDual
case *LogicalAggregation, *LogicalSelection, *LogicalJoin, *LogicalWindow:
if storeTp != kv.TiFlash {
return false
}
ret = ret && c.CanPushToCop(storeTp)
// These operators can be partially push down to TiFlash, so we don't raise warning for them.
case *LogicalLimit, *LogicalTopN:
return false
case *LogicalSequence:
return storeTp == kv.TiFlash
case *LogicalCTE:
if storeTp != kv.TiFlash {
return false
}
if c.Cte.RecursivePartLogicalPlan != nil || !c.Cte.SeedPartLogicalPlan.CanPushToCop(storeTp) {
return false
}
return true
default:
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"MPP mode may be blocked because operator `" + c.TP() + "` is not supported now.")
return false
}
}
return ret
}
2 changes: 1 addition & 1 deletion pkg/planner/core/operator/logicalop/logical_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (ls *LogicalSort) ReplaceExprColumns(replace map[string]*expression.Column)
// we do prune them. Note that we can't prune the expressions contain non-deterministic functions, such as rand().
func (ls *LogicalSort) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
var cols []*expression.Column
ls.ByItems, cols = utilfuncp.PruneByItems(ls, ls.ByItems, opt)
ls.ByItems, cols = pruneByItems(ls, ls.ByItems, opt)
parentUsedCols = append(parentUsedCols, cols...)
var err error
ls.Children()[0], err = ls.Children()[0].PruneColumns(parentUsedCols, opt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/operator/logicalop/logical_top_n.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (lt *LogicalTopN) ReplaceExprColumns(replace map[string]*expression.Column)
func (lt *LogicalTopN) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
child := lt.Children()[0]
var cols []*expression.Column
lt.ByItems, cols = utilfuncp.PruneByItems(lt, lt.ByItems, opt)
lt.ByItems, cols = pruneByItems(lt, lt.ByItems, opt)
parentUsedCols = append(parentUsedCols, cols...)
var err error
lt.Children()[0], err = child.PruneColumns(parentUsedCols, opt)
Expand Down
35 changes: 0 additions & 35 deletions pkg/planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@ import (
"context"
"slices"

"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace"
"github.com/pingcap/tidb/pkg/util/intest"
)

Expand Down Expand Up @@ -64,37 +60,6 @@ func noZeroColumnLayOut(p base.LogicalPlan) bool {
return true
}

func pruneByItems(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) (byItems []*util.ByItems,
parentUsedCols []*expression.Column) {
prunedByItems := make([]*util.ByItems, 0)
byItems = make([]*util.ByItems, 0, len(old))
seen := make(map[string]struct{}, len(old))
for _, byItem := range old {
pruned := true
hash := string(byItem.Expr.HashCode())
_, hashMatch := seen[hash]
seen[hash] = struct{}{}
cols := expression.ExtractColumns(byItem.Expr)
if !hashMatch {
if len(cols) == 0 {
if !expression.IsRuntimeConstExpr(byItem.Expr) {
pruned = false
byItems = append(byItems, byItem)
}
} else if byItem.Expr.GetType(p.SCtx().GetExprCtx().GetEvalCtx()).GetType() != mysql.TypeNull {
pruned = false
parentUsedCols = append(parentUsedCols, cols...)
byItems = append(byItems, byItem)
}
}
if pruned {
prunedByItems = append(prunedByItems, byItem)
}
}
logicaltrace.AppendByItemsPruneTraceStep(p, prunedByItems, opt)
return
}

// Name implements base.LogicalOptRule.<1st> interface.
func (*ColumnPruner) Name() string {
return "column_prune"
Expand Down
Loading

0 comments on commit 2864508

Please sign in to comment.