Skip to content

Commit

Permalink
Merge branch 'ref_40330_2' of https://github.com/Yisaer/tidb into ref…
Browse files Browse the repository at this point in the history
…_40330_2
  • Loading branch information
Yisaer committed Jan 17, 2023
2 parents b560c91 + eff2795 commit f37cbd1
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 109 deletions.
12 changes: 6 additions & 6 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool)
return 0, false, true
}
if mustInt64orUint64 {
if expected := checkParamTypeInt64orUint64(v); !expected {
if expected, _ := CheckParamTypeInt64orUint64(v); !expected {
return 0, false, false
}
}
Expand Down Expand Up @@ -2054,19 +2054,19 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool)
return 0, false, false
}

// check param type for plan cache limit, only allow int64 and uint64 now
// CheckParamTypeInt64orUint64 check param type for plan cache limit, only allow int64 and uint64 now
// eg: set @a = 1;
func checkParamTypeInt64orUint64(param *driver.ParamMarkerExpr) bool {
func CheckParamTypeInt64orUint64(param *driver.ParamMarkerExpr) (bool, uint64) {
val := param.GetValue()
switch v := val.(type) {
case int64:
if v >= 0 {
return true
return true, uint64(v)
}
case uint64:
return true
return true, v
}
return false
return false, 0
}

func extractLimitCountOffset(ctx sessionctx.Context, limit *ast.Limit) (count uint64,
Expand Down
11 changes: 0 additions & 11 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ func postOptimize(ctx context.Context, sctx sessionctx.Context, plan PhysicalPla
plan = eliminateUnionScanAndLock(sctx, plan)
plan = enableParallelApply(sctx, plan)
handleFineGrainedShuffle(ctx, sctx, plan)
checkPlanCacheable(sctx, plan)
propagateProbeParents(plan, nil)
countStarRewrite(plan)
return plan, nil
Expand Down Expand Up @@ -966,16 +965,6 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex
}
}

// checkPlanCacheable used to check whether a plan can be cached. Plans that
// meet the following characteristics cannot be cached:
// 1. Use the TiFlash engine.
// Todo: make more careful check here.
func checkPlanCacheable(sctx sessionctx.Context, plan PhysicalPlan) {
if sctx.GetSessionVars().StmtCtx.UseCache && useTiFlash(plan) {
sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: TiFlash plan is un-cacheable"))
}
}

// propagateProbeParents doesn't affect the execution plan, it only sets the probeParents field of a PhysicalPlan.
// It's for handling the inconsistency between row count in the statsInfo and the recorded actual row count. Please
// see comments in PhysicalPlan for details.
Expand Down
78 changes: 56 additions & 22 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,27 +158,29 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
}
}

paramNum, paramTypes := parseParamTypes(sctx, params)
paramTypes := parseParamTypes(sctx, params)

if stmtCtx.UseCache && stmtAst.CachedPlan != nil { // for point query plan
if plan, names, ok, err := getCachedPointPlan(stmtAst, sessVars, stmtCtx); ok {
return plan, names, err
}
}

limitCountAndOffset, paramErr := ExtractLimitFromAst(stmt.PreparedAst.Stmt, sctx)
if paramErr != nil {
return nil, nil, paramErr
}
if stmtCtx.UseCache { // for non-point plans
if plan, names, ok, err := getCachedPlan(sctx, isNonPrepared, cacheKey, bindSQL, is, stmt,
paramTypes); err != nil || ok {
paramTypes, limitCountAndOffset); err != nil || ok {
return plan, names, err
}
}

return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramNum, paramTypes, bindSQL)
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramTypes, bindSQL, limitCountAndOffset)
}

// parseParamTypes get parameters' types in PREPARE statement
func parseParamTypes(sctx sessionctx.Context, params []expression.Expression) (paramNum int, paramTypes []*types.FieldType) {
paramNum = len(params)
func parseParamTypes(sctx sessionctx.Context, params []expression.Expression) (paramTypes []*types.FieldType) {
for _, param := range params {
if c, ok := param.(*expression.Constant); ok { // from binary protocol
paramTypes = append(paramTypes, c.GetType())
Expand Down Expand Up @@ -221,12 +223,12 @@ func getCachedPointPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmt
}

func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache.Key, bindSQL string,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType) (Plan,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType, limitParams []uint64) (Plan,
[]*types.FieldName, bool, error) {
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx

candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes)
candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes, limitParams)
if !exist {
return nil, nil, false, nil
}
Expand Down Expand Up @@ -264,8 +266,9 @@ func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache

// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramNum int,
paramTypes []*types.FieldType, bindSQL string) (Plan, []*types.FieldName, error) {
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema,
stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramTypes []*types.FieldType,
bindSQL string, limitParams []uint64) (Plan, []*types.FieldName, error) {
stmtAst := stmt.PreparedAst
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
Expand All @@ -282,10 +285,10 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
return nil, nil, err
}

// We only cache the tableDual plan when the number of parameters are zero.
if containTableDual(p) && paramNum > 0 {
stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: get a TableDual plan"))
}
// check whether this plan is cacheable.
checkPlanCacheability(sctx, p, len(paramTypes))

// put this plan into the plan cache.
if stmtCtx.UseCache {
// rebuild key to exclude kv.TiFlash when stmt is not read only
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
Expand All @@ -296,16 +299,51 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes)
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes, limitParams)
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes)
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes, limitParams)
}
sessVars.FoundInPlanCache = false
return p, names, err
}

// checkPlanCacheability checks whether this plan is cacheable and set to skip plan cache if it's uncacheable.
func checkPlanCacheability(sctx sessionctx.Context, p Plan, paramNum int) {
stmtCtx := sctx.GetSessionVars().StmtCtx
var pp PhysicalPlan
switch x := p.(type) {
case *Insert:
pp = x.SelectPlan
case *Update:
pp = x.SelectPlan
case *Delete:
pp = x.SelectPlan
case PhysicalPlan:
pp = x
default:
stmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: unexpected un-cacheable plan %v", p.ExplainID().String()))
return
}
if pp == nil { // simple DML statements
return
}

if useTiFlash(pp) {
stmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: TiFlash plan is un-cacheable"))
return
}

// We only cache the tableDual plan when the number of parameters are zero.
if containTableDual(pp) && paramNum > 0 {
stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: get a TableDual plan"))
return
}

// TODO: plans accessing MVIndex are un-cacheable
}

// RebuildPlan4CachedPlan will rebuild this plan under current user parameters.
func RebuildPlan4CachedPlan(p Plan) error {
sc := p.SCtx().GetSessionVars().StmtCtx
Expand Down Expand Up @@ -675,17 +713,13 @@ func tryCachePointPlan(_ context.Context, sctx sessionctx.Context,
return err
}

func containTableDual(p Plan) bool {
func containTableDual(p PhysicalPlan) bool {
_, isTableDual := p.(*PhysicalTableDual)
if isTableDual {
return true
}
physicalPlan, ok := p.(PhysicalPlan)
if !ok {
return false
}
childContainTableDual := false
for _, child := range physicalPlan.Children() {
for _, child := range p.Children() {
childContainTableDual = childContainTableDual || containTableDual(child)
}
return childContainTableDual
Expand Down
36 changes: 28 additions & 8 deletions planner/core/plan_cache_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type LRUPlanCache struct {
lock sync.Mutex

// pickFromBucket get one element from bucket. The LRUPlanCache can not work if it is nil
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool)
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool)
// onEvict will be called if any eviction happened, only for test use now
onEvict func(kvcache.Key, kvcache.Value)

Expand All @@ -68,7 +68,7 @@ type LRUPlanCache struct {
// NewLRUPlanCache creates a PCLRUCache object, whose capacity is "capacity".
// NOTE: "capacity" should be a positive value.
func NewLRUPlanCache(capacity uint, guard float64, quota uint64,
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache {
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache {
if capacity < 1 {
capacity = 100
logutil.BgLogger().Info("capacity of LRU cache is less than 1, will use default value(100) init cache")
Expand All @@ -94,13 +94,13 @@ func strHashKey(key kvcache.Key, deepCopy bool) string {
}

// Get tries to find the corresponding value according to the given key.
func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (value kvcache.Value, ok bool) {
func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType, limitParams []uint64) (value kvcache.Value, ok bool) {
l.lock.Lock()
defer l.lock.Unlock()

bucket, bucketExist := l.buckets[strHashKey(key, false)]
if bucketExist {
if element, exist := l.pickFromBucket(bucket, paramTypes); exist {
if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist {
l.lruList.MoveToFront(element)
return element.Value.(*planCacheEntry).PlanValue, true
}
Expand All @@ -109,14 +109,14 @@ func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (valu
}

// Put puts the (key, value) pair into the LRU Cache.
func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType) {
func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType, limitParams []uint64) {
l.lock.Lock()
defer l.lock.Unlock()

hash := strHashKey(key, true)
bucket, bucketExist := l.buckets[hash]
if bucketExist {
if element, exist := l.pickFromBucket(bucket, paramTypes); exist {
if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist {
l.updateInstanceMetric(&planCacheEntry{PlanKey: key, PlanValue: value}, element.Value.(*planCacheEntry))
element.Value.(*planCacheEntry).PlanValue = value
l.lruList.MoveToFront(element)
Expand Down Expand Up @@ -252,16 +252,36 @@ func (l *LRUPlanCache) memoryControl() {
}

// PickPlanFromBucket pick one plan from bucket
func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType) (*list.Element, bool) {
func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType, limitParams []uint64) (*list.Element, bool) {
for k := range bucket {
plan := k.Value.(*planCacheEntry).PlanValue.(*PlanCacheValue)
if plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes) {
ok1 := plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes)
if !ok1 {
continue
}
ok2 := checkUint64SliceIfEqual(plan.limitOffsetAndCount, limitParams)
if ok2 {
return k, true
}
}
return nil, false
}

func checkUint64SliceIfEqual(a, b []uint64) bool {
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

// updateInstanceMetric update the memory usage and plan num for show in grafana
func (l *LRUPlanCache) updateInstanceMetric(in, out *planCacheEntry) {
updateInstancePlanNum(in, out)
Expand Down
Loading

0 comments on commit f37cbd1

Please sign in to comment.