Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: enable revive for part code #44382

Merged
merged 16 commits into from
Jun 5, 2023
17 changes: 4 additions & 13 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -491,27 +491,18 @@
".*_test\\.go$": "ignore generated code",
".*_generated\\.go$": "ignore generated code",
"planner/core/resolve_indices.go": "planner/core/resolve_indices.go",
"planner/core/rule_result_reorder.go": "planner/core/rule_result_reorder.go",
"planner/core/rule_join_reorder_dp.go": "planner/core/rule_join_reorder_dp.go",
"planner/core/plan_cache_param.go": "planner/core/plan_cache_param.go",
"planner/core/rule_predicate_push_down.go": "planner/core/rule_predicate_push_down.go",
"planner/core/rule_aggregation_push_down.go": "planner/core/rule_aggregation_push_down.go",
"planner/core/rule_max_min_eliminate.go": "planner/core/rule_max_min_eliminate.go",
"planner/core/rule_predicate_simplification.go": "planner/core/rule_predicate_simplification.go",
"planner/core/rule_max_min_eliminate.go": "planner/core/rule_max_min_eliminate.go",
"planner/core/indexmerge_path.go": "planner/core/indexmerge_path.go",
"planner/core/rule_join_reorder.go": "planner/core/rule_join_reorder.go",
"planner/core/plan_cacheable_checker.go": "planner/core/plan_cacheable_checker.go",
"planner/core/rule_decorrelate.go": "planner/core/rule_decorrelate.go",
"planner/core/plan_cache_utils.go": "planner/core/plan_cache_utils.go",
"planner/core/rule_aggregation_skew_rewrite.go": "planner/core/rule_aggregation_skew_rewrite.go",
"planner/core/rule_topn_push_down.go": "planner/core/rule_topn_push_down.go",
"planner/core/plan_cost_ver1.go": "planner/core/plan_cost_ver1.go",
"planner/core/expression_rewriter.go": "planner/core/expression_rewriter.go",
"planner/core/find_best_task.go": "planner/core/find_best_task.go",
"planner/core/rule_topn_push_down.go": "planner/core/rule_topn_push_down.go",
"planner/core/expression_rewriter.go": "planner/core/expression_rewriter.go",
"planner/core/task.go": "planner/core/task.go",
"planner/core/preprocess.go": "planner/core/preprocess.go",
"planner/core/rule_partition_processor.go": "planner/core/rule_partition_processor.go",
"planner/core/exhaust_physical_plans.go": "planner/core/exhaust_physical_plans.go",
"planner/core/preprocess.go": "planner/core/preprocess.go",
"planner/core/plan_cache_lru.go": "planner/core/plan_cache_lru.go",
"planner/core/common_plans.go": "planner/core/common_plans.go",
"plugin/conn_ip_example/": "plugin/conn_ip_example/"
Expand Down
93 changes: 15 additions & 78 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (p *PhysicalMergeJoin) tryToGetChildReqProp(prop *property.PhysicalProperty
return []*property.PhysicalProperty{lProp, rProp}, true
}

func (p *LogicalJoin) checkJoinKeyCollation(leftKeys, rightKeys []*expression.Column) bool {
func (*LogicalJoin) checkJoinKeyCollation(leftKeys, rightKeys []*expression.Column) bool {
// if a left key and its corresponding right key have different collation, don't use MergeJoin since
// the their children may sort their records in different ways
for i := range leftKeys {
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func (ijHelper *indexJoinBuildHelper) buildRangeDecidedByInformation(idxCols []*
func (p *LogicalJoin) constructInnerTableScanTask(
wrapper *indexJoinInnerChildWrapper,
ranges ranger.Ranges,
outerJoinKeys []*expression.Column,
_ []*expression.Column,
rangeInfo string,
keepOrder bool,
desc bool,
Expand Down Expand Up @@ -1115,7 +1115,7 @@ func (p *LogicalJoin) constructInnerByWrapper(wrapper *indexJoinInnerChildWrappe
return child
}

func (p *LogicalJoin) constructInnerSel(sel *LogicalSelection, child PhysicalPlan) PhysicalPlan {
func (*LogicalJoin) constructInnerSel(sel *LogicalSelection, child PhysicalPlan) PhysicalPlan {
if sel == nil {
return child
}
Expand All @@ -1126,7 +1126,7 @@ func (p *LogicalJoin) constructInnerSel(sel *LogicalSelection, child PhysicalPla
return physicalSel
}

func (p *LogicalJoin) constructInnerProj(proj *LogicalProjection, child PhysicalPlan) PhysicalPlan {
func (*LogicalJoin) constructInnerProj(proj *LogicalProjection, child PhysicalPlan) PhysicalPlan {
if proj == nil {
return child
}
Expand All @@ -1139,7 +1139,7 @@ func (p *LogicalJoin) constructInnerProj(proj *LogicalProjection, child Physical
return physicalProj
}

func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan {
func (*LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan {
if us == nil {
return reader
}
Expand All @@ -1153,76 +1153,13 @@ func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader Physi
return physicalUnionScan
}

func getColsNDVLowerBoundFromHistColl(cols []*expression.Column, histColl *statistics.HistColl) int64 {
if len(cols) == 0 || histColl == nil {
return -1
}
colUIDs := make([]int64, len(cols))
for i, col := range cols {
colUIDs[i] = col.UniqueID
}

// Note that we don't need to specially handle prefix index in this function, because the NDV of a prefix index is
// equal or less than the corresponding normal index, and that's safe here since we want a lower bound.

// 1. Try to get NDV from column stats if it's a single column.
if len(colUIDs) == 1 && histColl.Columns != nil {
uid := colUIDs[0]
if colStats, ok := histColl.Columns[uid]; ok && colStats != nil {
return colStats.NDV
}
}

slices.Sort(colUIDs)
if histColl.Indices == nil || histColl.Idx2ColumnIDs == nil {
return -1
}

// 2. Try to get NDV from index stats.
for idxID, idxCols := range histColl.Idx2ColumnIDs {
if len(idxCols) != len(colUIDs) {
continue
}
orderedIdxCols := make([]int64, len(idxCols))
copy(orderedIdxCols, idxCols)
slices.Sort(orderedIdxCols)
if !slices.Equal(orderedIdxCols, colUIDs) {
continue
}
if idxStats, ok := histColl.Indices[idxID]; ok && idxStats != nil {
return idxStats.NDV
}
}

// TODO: if there's an index that contains the expected columns, we can also make use of its NDV.
// For example, NDV(a,b,c) / NDV(c) is a safe lower bound of NDV(a,b).

// 3. If we still haven't got an NDV, we use the minimal NDV in the column stats as a lower bound.
// This would happen when len(cols) > 1 and no proper index stats are available.
minNDV := int64(-1)
for _, colStats := range histColl.Columns {
if colStats == nil || colStats.Info == nil {
continue
}
col := colStats.Info
if col.IsGenerated() && !col.GeneratedStored {
continue
}
if (colStats.NDV > 0 && minNDV <= 0) ||
colStats.NDV < minNDV {
minNDV = colStats.NDV
}
}
return minNDV
}

// constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin.
func (p *LogicalJoin) constructInnerIndexScanTask(
wrapper *indexJoinInnerChildWrapper,
path *util.AccessPath,
ranges ranger.Ranges,
filterConds []expression.Expression,
innerJoinKeys []*expression.Column,
_ []*expression.Column,
rangeInfo string,
keepOrder bool,
desc bool,
Expand Down Expand Up @@ -2092,18 +2029,18 @@ func checkChildFitBC(p Plan) bool {
return p.SCtx().GetSessionVars().BroadcastJoinThresholdSize == -1 || sz < float64(p.SCtx().GetSessionVars().BroadcastJoinThresholdSize)
}

func calcBroadcastExchangeSize(p Plan, mppStoreCnt int) (float64, float64, bool) {
func calcBroadcastExchangeSize(p Plan, mppStoreCnt int) (row float64, size float64, hasSize bool) {
s := p.statsInfo()
row := float64(s.Count()) * float64(mppStoreCnt-1)
row = float64(s.Count()) * float64(mppStoreCnt-1)
if s.HistColl == nil {
return row, 0, false
}
avg := s.HistColl.GetAvgRowSize(p.SCtx(), p.Schema().Columns, false, false)
size := avg * row
size = avg * row
return row, size, true
}

func calcBroadcastExchangeSizeByChild(p1 Plan, p2 Plan, mppStoreCnt int) (float64, float64, bool) {
func calcBroadcastExchangeSizeByChild(p1 Plan, p2 Plan, mppStoreCnt int) (row float64, size float64, hasSize bool) {
row1, size1, hasSize1 := calcBroadcastExchangeSize(p1, mppStoreCnt)
row2, size2, hasSize2 := calcBroadcastExchangeSize(p2, mppStoreCnt)

Expand All @@ -2119,18 +2056,18 @@ func calcBroadcastExchangeSizeByChild(p1 Plan, p2 Plan, mppStoreCnt int) (float6
return math.Min(row1, row2), 0, false
}

func calcHashExchangeSize(p Plan, mppStoreCnt int) (float64, float64, bool) {
func calcHashExchangeSize(p Plan, mppStoreCnt int) (row float64, sz float64, hasSize bool) {
s := p.statsInfo()
row := float64(s.Count()) * float64(mppStoreCnt-1) / float64(mppStoreCnt)
row = float64(s.Count()) * float64(mppStoreCnt-1) / float64(mppStoreCnt)
if s.HistColl == nil {
return row, 0, false
}
avg := s.HistColl.GetAvgRowSize(p.SCtx(), p.Schema().Columns, false, false)
sz := avg * row
sz = avg * row
return row, sz, true
}

func calcHashExchangeSizeByChild(p1 Plan, p2 Plan, mppStoreCnt int) (float64, float64, bool) {
func calcHashExchangeSizeByChild(p1 Plan, p2 Plan, mppStoreCnt int) (row float64, size float64, hasSize bool) {
row1, size1, hasSize1 := calcHashExchangeSize(p1, mppStoreCnt)
row2, size2, hasSize2 := calcHashExchangeSize(p2, mppStoreCnt)

Expand Down Expand Up @@ -2867,7 +2804,7 @@ func (lw *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) (
}

// exhaustPhysicalPlans is only for implementing interface. DataSource and Dual generate task in `findBestTask` directly.
func (p *baseLogicalPlan) exhaustPhysicalPlans(_ *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
func (*baseLogicalPlan) exhaustPhysicalPlans(*property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
panic("baseLogicalPlan.exhaustPhysicalPlans() should never be called.")
}

Expand Down
13 changes: 6 additions & 7 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ func (ds *DataSource) getIndexCandidate(path *util.AccessPath, prop *property.Ph
return candidate
}

func (ds *DataSource) getIndexMergeCandidate(path *util.AccessPath) *candidatePath {
func (*DataSource) getIndexMergeCandidate(path *util.AccessPath) *candidatePath {
candidate := &candidatePath{path: path}
return candidate
}
Expand Down Expand Up @@ -1123,9 +1123,9 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
if allRangeIsPoint {
var pointGetTask task
if len(path.Ranges) == 1 {
pointGetTask = ds.convertToPointGet(prop, candidate, opt)
pointGetTask = ds.convertToPointGet(prop, candidate)
} else {
pointGetTask = ds.convertToBatchPointGet(prop, candidate, hashPartColName, opt)
pointGetTask = ds.convertToBatchPointGet(prop, candidate, hashPartColName)
}

// Batch/PointGet plans may be over-optimized, like `a>=1(?) and a<=1(?)` --> `a=1` --> PointGet(a=1).
Expand Down Expand Up @@ -1213,7 +1213,7 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
return
}

func (ds *DataSource) isSafePointGetPlan4PlanCache(path *util.AccessPath) bool {
func (*DataSource) isSafePointGetPlan4PlanCache(path *util.AccessPath) bool {
// PointGet might contain some over-optimized assumptions, like `a>=1 and a<=1` --> `a=1`, but
// these assumptions may be broken after parameters change.

Expand Down Expand Up @@ -2307,7 +2307,7 @@ func (ds *DataSource) convertToSampleTable(prop *property.PhysicalProperty,
}, nil
}

func (ds *DataSource) convertToPointGet(prop *property.PhysicalProperty, candidate *candidatePath, opt *physicalOptimizeOp) (task task) {
func (ds *DataSource) convertToPointGet(prop *property.PhysicalProperty, candidate *candidatePath) (task task) {
if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
return invalidTask
}
Expand Down Expand Up @@ -2384,8 +2384,7 @@ func (ds *DataSource) convertToPointGet(prop *property.PhysicalProperty, candida
return rTsk
}

func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty,
candidate *candidatePath, hashPartColName *model.CIStr, opt *physicalOptimizeOp) (task task) {
func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty, candidate *candidatePath, hashPartColName *model.CIStr) (task task) {
if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
return invalidTask
}
Expand Down
18 changes: 8 additions & 10 deletions planner/core/plan_cache_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package core

import (
"bytes"
"context"
"errors"
"sync"

Expand Down Expand Up @@ -83,7 +82,7 @@ func (pr *paramReplacer) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
return in, false
}

func (pr *paramReplacer) Leave(in ast.Node) (out ast.Node, ok bool) {
func (*paramReplacer) Leave(in ast.Node) (out ast.Node, ok bool) {
return in, true
}

Expand All @@ -94,9 +93,9 @@ func (pr *paramReplacer) Reset() {
// GetParamSQLFromAST returns the parameterized SQL of this AST.
// NOTICE: this function does not modify the original AST.
// paramVals are copied from this AST.
func GetParamSQLFromAST(ctx context.Context, sctx sessionctx.Context, stmt ast.StmtNode) (paramSQL string, paramVals []types.Datum, err error) {
func GetParamSQLFromAST(sctx sessionctx.Context, stmt ast.StmtNode) (paramSQL string, paramVals []types.Datum, err error) {
var params []*driver.ValueExpr
paramSQL, params, err = ParameterizeAST(ctx, sctx, stmt)
paramSQL, params, err = ParameterizeAST(stmt)
if err != nil {
return "", nil, err
}
Expand All @@ -105,14 +104,14 @@ func GetParamSQLFromAST(ctx context.Context, sctx sessionctx.Context, stmt ast.S
p.Datum.Copy(&paramVals[i])
}

err = RestoreASTWithParams(ctx, sctx, stmt, params)
err = RestoreASTWithParams(sctx, stmt, params)
return
}

// ParameterizeAST parameterizes this StmtNode.
// e.g. `select * from t where a<10 and b<23` --> `select * from t where a<? and b<?`, [10, 23].
// NOTICE: this function may modify the input stmt.
func ParameterizeAST(ctx context.Context, sctx sessionctx.Context, stmt ast.StmtNode) (paramSQL string, params []*driver.ValueExpr, err error) {
func ParameterizeAST(stmt ast.StmtNode) (paramSQL string, params []*driver.ValueExpr, err error) {
pr := paramReplacerPool.Get().(*paramReplacer)
pCtx := paramCtxPool.Get().(*format.RestoreCtx)
defer func() {
Expand All @@ -135,8 +134,7 @@ type paramRestorer struct {
}

func (pr *paramRestorer) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
switch n := in.(type) {
case *driver.ParamMarkerExpr:
if n, ok := in.(*driver.ParamMarkerExpr); ok {
if n.Offset >= len(pr.params) {
pr.err = errors.New("failed to restore ast.Node")
return nil, true
Expand All @@ -152,7 +150,7 @@ func (pr *paramRestorer) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
return in, false
}

func (pr *paramRestorer) Leave(in ast.Node) (out ast.Node, ok bool) {
func (*paramRestorer) Leave(in ast.Node) (out ast.Node, ok bool) {
return in, true
}

Expand All @@ -162,7 +160,7 @@ func (pr *paramRestorer) Reset() {

// RestoreASTWithParams restore this parameterized AST with specific parameters.
// e.g. `select * from t where a<? and b<?`, [10, 23] --> `select * from t where a<10 and b<23`.
func RestoreASTWithParams(ctx context.Context, _ sessionctx.Context, stmt ast.StmtNode, params []*driver.ValueExpr) error {
func RestoreASTWithParams(_ sessionctx.Context, stmt ast.StmtNode, params []*driver.ValueExpr) error {
pr := paramRestorerPool.Get().(*paramRestorer)
defer func() {
pr.Reset()
Expand Down
Loading