Skip to content

Commit

Permalink
bindinfo: support evolve plan (#13465)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and sre-bot committed Nov 22, 2019
1 parent 71e19a7 commit 518692c
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 46 deletions.
6 changes: 6 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,4 +523,10 @@ func (s *testSuite) TestAddEvolveTasks(c *C) {
c.Assert(len(rows), Equals, 2)
c.Assert(rows[1][1], Equals, "SELECT /*+ USE_INDEX(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0")
c.Assert(rows[1][3], Equals, "pending verify")
tk.MustExec("admin evolve bindings")
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 2)
c.Assert(rows[1][1], Equals, "SELECT /*+ USE_INDEX(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0")
status := rows[1][3].(string)
c.Assert(status == "using" || status == "rejected", IsTrue)
}
3 changes: 3 additions & 0 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
Invalid = "invalid"
// PendingVerify means the bind info needs to be verified.
PendingVerify = "pending verify"
// Rejected means that the bind has been rejected after verify process.
// We can retry it after certain time has passed.
Rejected = "rejected"
)

// Binding stores the basic bind hint info.
Expand Down
180 changes: 178 additions & 2 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ package bindinfo

import (
"context"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -29,12 +32,14 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/timeutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -80,6 +85,13 @@ type BindHandle struct {
// Lease influences the duration of loading bind info and handling invalid bind.
var Lease = 3 * time.Second

const (
// OwnerKey is the bindinfo owner path that is saved to etcd.
OwnerKey = "/tidb/bindinfo/owner"
// Prompt is the prompt for bindinfo owner manager.
Prompt = "bindinfo"
)

type bindRecordUpdate struct {
bindRecord *BindRecord
updateTime time.Time
Expand Down Expand Up @@ -170,7 +182,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
if br != nil {
binding := br.FindBinding(record.Bindings[0].id)
if binding != nil {
// There is already a binding with status `Using` or `PendingVerify`, we could directly cancel the job.
// There is already a binding with status `Using`, `PendingVerify` or `Rejected`, we could directly cancel the job.
if record.Bindings[0].Status == PendingVerify {
return nil
}
Expand Down Expand Up @@ -463,7 +475,9 @@ func (c cache) setBindRecord(hash string, meta *BindRecord) {
func (c cache) copy() cache {
newCache := make(cache, len(c))
for k, v := range c {
newCache[k] = v
bindRecords := make([]*BindRecord, len(v))
copy(bindRecords, v)
newCache[k] = bindRecords
}
return newCache
}
Expand Down Expand Up @@ -590,6 +604,168 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
h.pendingVerifyBindRecordMap.flushToStore()
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')",
variable.TiDBEvolvePlanTaskMaxTime, variable.TiDBEvolvePlanTaskStartTime, variable.TiDBEvolvePlanTaskEndTime)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
maxTime, startTimeStr, endTimeStr := int64(variable.DefTiDBEvolvePlanTaskMaxTime), variable.DefTiDBEvolvePlanTaskStartTime, variable.DefAutoAnalyzeEndTime
for _, row := range rows {
switch row.GetString(0) {
case variable.TiDBEvolvePlanTaskMaxTime:
maxTime, err = strconv.ParseInt(row.GetString(1), 10, 64)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
case variable.TiDBEvolvePlanTaskStartTime:
startTimeStr = row.GetString(1)
case variable.TiDBEvolvePlanTaskEndTime:
endTimeStr = row.GetString(1)
}
}
startTime, err := time.ParseInLocation(variable.FullDayTimeFormat, startTimeStr, time.UTC)
if err != nil {
return 0, time.Time{}, time.Time{}, err

}
endTime, err := time.ParseInLocation(variable.FullDayTimeFormat, endTimeStr, time.UTC)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
return time.Duration(maxTime) * time.Second, startTime, endTime, nil
}

const (
// acceptFactor is the factor to decide should we accept the pending verified plan.
// A pending verified plan will be accepted if it performs at least `acceptFactor` times better than the accepted plans.
acceptFactor = 1.5
// nextVerifyDuration is the duration that we will retry the rejected plans.
nextVerifyDuration = 7 * 24 * time.Hour
)

func (h *BindHandle) getOnePendingVerifyJob() (string, string, Binding) {
cache := h.bindInfo.Value.Load().(cache)
for _, bindRecords := range cache {
for _, bindRecord := range bindRecords {
for _, bind := range bindRecord.Bindings {
if bind.Status == PendingVerify {
return bindRecord.OriginalSQL, bindRecord.Db, bind
}
if bind.Status != Rejected {
continue
}
updateTime, err := bind.UpdateTime.Time.GoTime(time.UTC)
// Should not happen.
if err != nil {
continue
}
// Rejected and retry it now.
if time.Since(updateTime) > nextVerifyDuration {
return bindRecord.OriginalSQL, bindRecord.Db, bind
}
}
}
}
return "", "", Binding{}
}

func (h *BindHandle) getRunningDuration(sctx sessionctx.Context, db, sql string, maxTime time.Duration) (time.Duration, error) {
ctx := context.TODO()
if db != "" {
_, err := sctx.(sqlexec.SQLExecutor).Execute(ctx, fmt.Sprintf("use `%s`", db))
if err != nil {
return 0, err
}
}
ctx, cancelFunc := context.WithCancel(ctx)
timer := time.NewTimer(maxTime)
resultChan := make(chan error)
startTime := time.Now()
go runSQL(ctx, sctx, sql, resultChan)
select {
case err := <-resultChan:
cancelFunc()
if err != nil {
return 0, err
}
return time.Since(startTime), nil
case <-timer.C:
cancelFunc()
}
<-resultChan
return -1, nil
}

func runSQL(ctx context.Context, sctx sessionctx.Context, sql string, resultChan chan<- error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
resultChan <- errors.New(fmt.Sprintf("run sql panicked: %v", string(buf)))
}
}()
recordSets, err := sctx.(sqlexec.SQLExecutor).Execute(ctx, sql)
if err != nil {
terror.Call(recordSets[0].Close)
resultChan <- err
return
}
recordSet := recordSets[0]
chk := recordSets[0].NewChunk()
for {
err = recordSet.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
break
}
}
terror.Call(recordSets[0].Close)
resultChan <- err
return
}

// HandleEvolvePlanTask tries to evolve one plan task.
// It only handle one tasks once because we want each task could use the latest parameters.
func (h *BindHandle) HandleEvolvePlanTask(sctx sessionctx.Context) error {
originalSQL, db, binding := h.getOnePendingVerifyJob()
if originalSQL == "" {
return nil
}
maxTime, startTime, endTime, err := getEvolveParameters(sctx)
if err != nil {
return err
}
if maxTime == 0 || !timeutil.WithinDayTimePeriod(startTime, endTime, time.Now()) {
return nil
}
sctx.GetSessionVars().UsePlanBaselines = true
acceptedPlanTime, err := h.getRunningDuration(sctx, db, binding.BindSQL, maxTime)
// If we just return the error to the caller, this job will be retried again and again and cause endless logs,
// since it is still in the bind record. Now we just drop it and if it is actually retryable,
// we will hope for that we can capture this evolve task again.
if err != nil {
return h.DropBindRecord(sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), &BindRecord{OriginalSQL: originalSQL, Db: db, Bindings: []Binding{binding}})
}
// If the accepted plan timeouts, it is hard to decide the timeout for verify plan.
// Currently we simply mark the verify plan as `using` if it could run successfully within maxTime.
if acceptedPlanTime > 0 {
maxTime = time.Duration(float64(acceptedPlanTime) / acceptFactor)
}
sctx.GetSessionVars().UsePlanBaselines = false
verifyPlanTime, err := h.getRunningDuration(sctx, db, binding.BindSQL, maxTime)
if err != nil {
return h.DropBindRecord(sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), &BindRecord{OriginalSQL: originalSQL, Db: db, Bindings: []Binding{binding}})
}
if verifyPlanTime < 0 {
binding.Status = Rejected
} else {
binding.Status = Using
}
return h.AddBindRecord(sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), &BindRecord{OriginalSQL: originalSQL, Db: db, Bindings: []Binding{binding}})
}

// Clear resets the bind handle. It is used for test.
func (h *BindHandle) Clear() {
h.bindInfo.Store(make(cache))
Expand Down
28 changes: 18 additions & 10 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,16 +830,17 @@ func (do *Domain) BindHandle() *bindinfo.BindHandle {

// LoadBindInfoLoop create a goroutine loads BindInfo in a loop, it should
// be called only once in BootstrapSession.
func (do *Domain) LoadBindInfoLoop(ctx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
do.bindHandle = bindinfo.NewBindHandle(ctx)
func (do *Domain) LoadBindInfoLoop(ctxForHandle sessionctx.Context, ctxForEvolve sessionctx.Context) error {
ctxForHandle.GetSessionVars().InRestrictedSQL = true
ctxForEvolve.GetSessionVars().InRestrictedSQL = true
do.bindHandle = bindinfo.NewBindHandle(ctxForHandle)
err := do.bindHandle.Update(true)
if err != nil || bindinfo.Lease == 0 {
return err
}

do.globalBindHandleWorkerLoop()
do.handleInvalidBindTaskLoop()
do.handleEvolvePlanTasksLoop(ctxForEvolve)
return nil
}

Expand All @@ -862,25 +863,32 @@ func (do *Domain) globalBindHandleWorkerLoop() {
if !variable.TiDBOptOn(variable.CapturePlanBaseline.GetVal()) {
continue
}
do.bindHandle.DropInvalidBindRecord()
do.bindHandle.CaptureBaselines()
do.bindHandle.SaveEvolveTasksToStore()
}
}
}()
}

func (do *Domain) handleInvalidBindTaskLoop() {
func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context) {
do.wg.Add(1)
go func() {
defer do.wg.Done()
defer recoverInDomain("loadBindInfoLoop-dropInvalidBindInfo", false)
defer recoverInDomain("handleEvolvePlanTasksLoop", false)
owner := do.newOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
for {
select {
case <-do.exit:
return
case <-time.After(bindinfo.Lease):
}
do.bindHandle.DropInvalidBindRecord()
if owner.IsOwner() {
err := do.bindHandle.HandleEvolvePlanTask(ctx)
if err != nil {
logutil.BgLogger().Info("evolve plan failed", zap.Error(err))
}
}
}
}()
}
Expand Down Expand Up @@ -928,7 +936,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
if do.statsLease <= 0 {
return nil
}
owner := do.newStatsOwner()
owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
do.wg.Add(1)
do.SetStatsUpdating(true)
go do.updateStatsWorker(ctx, owner)
Expand All @@ -939,14 +947,14 @@ func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
return nil
}

func (do *Domain) newStatsOwner() owner.Manager {
func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
id := do.ddl.OwnerManager().ID()
cancelCtx, cancelFunc := context.WithCancel(context.Background())
var statsOwner owner.Manager
if do.etcdClient == nil {
statsOwner = owner.NewMockManager(id, cancelFunc)
} else {
statsOwner = owner.NewOwnerManager(do.etcdClient, handle.StatsPrompt, id, handle.StatsOwnerKey, cancelFunc)
statsOwner = owner.NewOwnerManager(do.etcdClient, prompt, id, ownerKey, cancelFunc)
}
// TODO: Need to do something when err is not nil.
err := statsOwner.CampaignOwner(cancelCtx)
Expand Down
6 changes: 6 additions & 0 deletions executor/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func (e *SQLBindExec) Next(ctx context.Context, req *chunk.Chunk) error {
return e.flushBindings()
case plannercore.OpCaptureBindings:
e.captureBindings()
case plannercore.OpEvolveBindings:
return e.evolveBindings()
default:
return errors.Errorf("unsupported SQL bind operation: %v", e.sqlBindOp)
}
Expand Down Expand Up @@ -104,3 +106,7 @@ func (e *SQLBindExec) flushBindings() error {
func (e *SQLBindExec) captureBindings() {
domain.GetDomain(e.ctx).BindHandle().CaptureBaselines()
}

func (e *SQLBindExec) evolveBindings() error {
return domain.GetDomain(e.ctx).BindHandle().HandleEvolvePlanTask(e.ctx)
}
6 changes: 5 additions & 1 deletion planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ func handleEvolveTasks(ctx context.Context, sctx sessionctx.Context, br *bindinf
if err != nil {
logutil.Logger(ctx).Info("Restore SQL failed", zap.Error(err))
}
bindsql := strings.Replace(sb.String(), "SELECT", fmt.Sprintf("SELECT /*+ %s*/", planHint), 1)
bindSQL := sb.String()
selectIdx := strings.Index(bindSQL, "SELECT")
// Remove possible `explain` prefix.
bindSQL = bindSQL[selectIdx:]
bindsql := strings.Replace(bindSQL, "SELECT", fmt.Sprintf("SELECT /*+ %s*/", planHint), 1)
globalHandle := domain.GetDomain(sctx).BindHandle()
charset, collation := sctx.GetSessionVars().GetCharsetInfo()
binding := bindinfo.Binding{BindSQL: bindsql, Status: bindinfo.PendingVerify, Charset: charset, Collation: collation}
Expand Down
6 changes: 5 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
if err != nil {
return nil, err
}
err = dom.LoadBindInfoLoop(se2)
se3, err := createSession(store)
if err != nil {
return nil, err
}
err = dom.LoadBindInfoLoop(se2, se3)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBCapturePlanBaseline, "0"},
{ScopeGlobal | ScopeSession, TiDBUsePlanBaselines, BoolToIntStr(DefTiDBUsePlanBaselines)},
{ScopeGlobal | ScopeSession, TiDBEvolvePlanBaselines, BoolToIntStr(DefTiDBEvolvePlanBaselines)},
{ScopeGlobal, TiDBEvolvePlanTaskMaxTime, strconv.Itoa(DefTiDBEvolvePlanTaskMaxTime)},
{ScopeGlobal, TiDBEvolvePlanTaskStartTime, DefTiDBEvolvePlanTaskStartTime},
{ScopeGlobal, TiDBEvolvePlanTaskEndTime, DefTiDBEvolvePlanTaskEndTime},
{ScopeGlobal | ScopeSession, TiDBIsolationReadEngines, "tikv,tiflash"},
{ScopeGlobal | ScopeSession, TiDBStoreLimit, strconv.FormatInt(atomic.LoadInt64(&config.GetGlobalConfig().TiKVClient.StoreLimit), 10)},
}
Expand Down
Loading

0 comments on commit 518692c

Please sign in to comment.