Skip to content

Commit

Permalink
planner: fix the issue that the optimizer caches wrong TableDual plan…
Browse files Browse the repository at this point in the history
…s under binary protocol (#34709) (#34737)

close #34678, close #34690
  • Loading branch information
ti-srebot authored Jun 24, 2022
1 parent 216f746 commit 8f13676
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 30 deletions.
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
is: b.is,
name: v.Name,
usingVars: v.UsingVars,
usingVars: v.TxtProtoVars,
id: v.ExecID,
stmt: v.Stmt,
plan: v.Plan,
Expand Down
23 changes: 18 additions & 5 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package core

import (
"bytes"
"math"
"sync/atomic"
"time"
Expand Down Expand Up @@ -181,25 +182,37 @@ type PlanCacheValue struct {
Plan Plan
OutPutNames []*types.FieldName
TblInfo2UnionScan map[*model.TableInfo]bool
UserVarTypes FieldSlice
TxtVarTypes FieldSlice // variable types under text protocol
BinVarTypes []byte // variable types under binary protocol
IsBinProto bool // whether this plan is under binary protocol
BindSQL string
}

func (v *PlanCacheValue) varTypesUnchanged(binVarTps []byte, txtVarTps []*types.FieldType) bool {
if v.IsBinProto {
return bytes.Equal(v.BinVarTypes, binVarTps)
}
return v.TxtVarTypes.Equal(txtVarTps)
}

// NewPlanCacheValue creates a SQLCacheValue.
func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.TableInfo]bool, userVarTps []*types.FieldType, bindSQL string) *PlanCacheValue {
func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.TableInfo]bool,
isBinProto bool, binVarTypes []byte, txtVarTps []*types.FieldType, bindSQL string) *PlanCacheValue {
dstMap := make(map[*model.TableInfo]bool)
for k, v := range srcMap {
dstMap[k] = v
}
userVarTypes := make([]types.FieldType, len(userVarTps))
for i, tp := range userVarTps {
userVarTypes := make([]types.FieldType, len(txtVarTps))
for i, tp := range txtVarTps {
userVarTypes[i] = *tp
}
return &PlanCacheValue{
Plan: plan,
OutPutNames: names,
TblInfo2UnionScan: dstMap,
UserVarTypes: userVarTypes,
TxtVarTypes: userVarTypes,
BinVarTypes: binVarTypes,
IsBinProto: isBinProto,
BindSQL: bindSQL,
}
}
Expand Down
60 changes: 38 additions & 22 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,15 @@ type Prepare struct {
type Execute struct {
baseSchemaProducer

Name string
UsingVars []expression.Expression
PrepareParams []types.Datum
ExecID uint32
SnapshotTS uint64
IsStaleness bool
Name string
TxtProtoVars []expression.Expression // parsed variables under text protocol
BinProtoVars []types.Datum // parsed variables under binary protocol
ExecID uint32
// Deprecated: SnapshotTS now is only used for asserting after refactoring stale read, it will be removed later.
SnapshotTS uint64
// Deprecated: IsStaleness now is only used for asserting after refactoring stale read, it will be removed later.
IsStaleness bool
// Deprecated: ReadReplicaScope now is only used for asserting after refactoring stale read, it will be removed later.
ReadReplicaScope string
Stmt ast.StmtNode
StmtType string
Expand Down Expand Up @@ -228,25 +231,25 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
prepared := preparedObj.PreparedAst
vars.StmtCtx.StmtType = prepared.StmtType

paramLen := len(e.PrepareParams)
paramLen := len(e.BinProtoVars)
if paramLen > 0 {
// for binary protocol execute, argument is placed in vars.PrepareParams
// for binary protocol execute, argument is placed in vars.BinProtoVars
if len(prepared.Params) != paramLen {
return errors.Trace(ErrWrongParamCount)
}
vars.PreparedParams = e.PrepareParams
vars.PreparedParams = e.BinProtoVars
for i, val := range vars.PreparedParams {
param := prepared.Params[i].(*driver.ParamMarkerExpr)
param.Datum = val
param.InExecute = true
}
} else {
// for `execute stmt using @a, @b, @c`, using value in e.UsingVars
if len(prepared.Params) != len(e.UsingVars) {
// for `execute stmt using @a, @b, @c`, using value in e.TxtProtoVars
if len(prepared.Params) != len(e.TxtProtoVars) {
return errors.Trace(ErrWrongParamCount)
}

for i, usingVar := range e.UsingVars {
for i, usingVar := range e.TxtProtoVars {
val, err := usingVar.Eval(chunk.Row{})
if err != nil {
return err
Expand Down Expand Up @@ -449,15 +452,28 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
bindSQL = GetBindSQL4PlanCache(sctx, preparedStmt)
cacheKey = NewPlanCacheKey(sctx.GetSessionVars(), e.ExecID, prepared.SchemaVersion)
}
tps := make([]*types.FieldType, len(e.UsingVars))
varsNum := len(e.UsingVars)
for i, param := range e.UsingVars {
name := param.(*expression.ScalarFunction).GetArgs()[0].String()
tps[i] = sctx.GetSessionVars().UserVarTypes[name]
if tps[i] == nil {
tps[i] = types.NewFieldType(mysql.TypeNull)

var varsNum int
var binVarTypes []byte
var txtVarTypes []*types.FieldType
isBinProtocol := len(e.BinProtoVars) > 0
if isBinProtocol { // binary protocol
varsNum = len(e.BinProtoVars)
for _, param := range e.BinProtoVars {
binVarTypes = append(binVarTypes, param.Kind())
}
} else { // txt protocol
varsNum = len(e.TxtProtoVars)
for _, param := range e.TxtProtoVars {
name := param.(*expression.ScalarFunction).GetArgs()[0].String()
tp := sctx.GetSessionVars().UserVarTypes[name]
if tp == nil {
tp = types.NewFieldType(mysql.TypeNull)
}
txtVarTypes = append(txtVarTypes, tp)
}
}

if prepared.CachedPlan != nil {
// Rewriting the expression in the select.where condition will convert its
// type from "paramMarker" to "Constant".When Point Select queries are executed,
Expand Down Expand Up @@ -498,7 +514,7 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
sctx.PreparedPlanCache().Delete(cacheKey)
break
}
if !cachedVal.UserVarTypes.Equal(tps) {
if !cachedVal.varTypesUnchanged(binVarTypes, txtVarTypes) {
continue
}
planValid := true
Expand Down Expand Up @@ -567,13 +583,13 @@ REBUILD:
cacheKey = NewPlanCacheKey(sessVars, e.ExecID, prepared.SchemaVersion)
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, tps, sessVars.StmtCtx.BindSQL)
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, isBinProtocol, binVarTypes, txtVarTypes, sessVars.StmtCtx.BindSQL)
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest)
if cacheVals, exists := sctx.PreparedPlanCache().Get(cacheKey); exists {
hitVal := false
for i, cacheVal := range cacheVals.([]*PlanCacheValue) {
if cacheVal.UserVarTypes.Equal(tps) {
if cacheVal.varTypesUnchanged(binVarTypes, txtVarTypes) {
hitVal = true
cacheVals.([]*PlanCacheValue)[i] = cached
break
Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,9 @@ func (b *PlanBuilder) buildExecute(ctx context.Context, v *ast.ExecuteStmt) (Pla
}
vars = append(vars, newExpr)
}
exe := &Execute{Name: v.Name, UsingVars: vars, ExecID: v.ExecID}
exe := &Execute{Name: v.Name, TxtProtoVars: vars, ExecID: v.ExecID}
if v.BinaryArgs != nil {
exe.PrepareParams = v.BinaryArgs.([]types.Datum)
exe.BinProtoVars = v.BinaryArgs.([]types.Datum)
}
return exe, nil
}
Expand Down

0 comments on commit 8f13676

Please sign in to comment.