Skip to content

Commit

Permalink
Merge branch 'master' into tz_portable
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored Nov 22, 2019
2 parents c9e7a7d + 518692c commit 9080410
Show file tree
Hide file tree
Showing 183 changed files with 6,030 additions and 2,796 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ ifeq ("$(TRAVIS_COVERAGE)", "1")
else
@echo "Running in native mode."
@export log_level=error; export TZ='Asia/Shanghai'; \
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' -cover $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' -cover $(PACKAGES) -check.timeout 4s || { $(FAILPOINT_DISABLE); exit 1; }
endif
@$(FAILPOINT_DISABLE)

Expand Down
10 changes: 8 additions & 2 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (s *testSuite) TestCapturePlanBaseline(c *C) {
tk.MustQuery("show global bindings").Check(testkit.Rows())
tk.MustExec("select * from t")
tk.MustExec("select * from t")
s.domain.BindHandle().CaptureBaselines()
tk.MustExec("admin capture bindings")
rows := tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][0], Equals, "select * from t")
Expand Down Expand Up @@ -518,9 +518,15 @@ func (s *testSuite) TestAddEvolveTasks(c *C) {
// It cannot choose table path although it has lowest cost.
tk.MustQuery("select * from t where a >= 4 and b >= 1 and c = 0")
c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idx_a")
domain.GetDomain(tk.Se).BindHandle().SaveEvolveTasksToStore()
tk.MustExec("admin flush bindings")
rows := tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 2)
c.Assert(rows[1][1], Equals, "SELECT /*+ USE_INDEX(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0")
c.Assert(rows[1][3], Equals, "pending verify")
tk.MustExec("admin evolve bindings")
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 2)
c.Assert(rows[1][1], Equals, "SELECT /*+ USE_INDEX(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0")
status := rows[1][3].(string)
c.Assert(status == "using" || status == "rejected", IsTrue)
}
13 changes: 13 additions & 0 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
Invalid = "invalid"
// PendingVerify means the bind info needs to be verified.
PendingVerify = "pending verify"
// Rejected means that the bind has been rejected after verify process.
// We can retry it after certain time has passed.
Rejected = "rejected"
)

// Binding stores the basic bind hint info.
Expand Down Expand Up @@ -148,6 +151,16 @@ func (br *BindRecord) remove(deleted *BindRecord) *BindRecord {
return result
}

func (br *BindRecord) removeDeletedBindings() *BindRecord {
result := BindRecord{OriginalSQL: br.OriginalSQL, Db: br.Db, Bindings: make([]Binding, 0, len(br.Bindings))}
for _, binding := range br.Bindings {
if binding.Status != deleted {
result.Bindings = append(result.Bindings, binding)
}
}
return &result
}

// shallowCopy shallow copies the BindRecord.
func (br *BindRecord) shallowCopy() *BindRecord {
result := BindRecord{
Expand Down
210 changes: 195 additions & 15 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ package bindinfo

import (
"context"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -29,12 +32,14 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/timeutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -65,7 +70,8 @@ type BindHandle struct {
bindInfo struct {
sync.Mutex
atomic.Value
parser *parser.Parser
parser *parser.Parser
lastUpdateTime types.Time
}

// invalidBindRecordMap indicates the invalid bind records found during querying.
Expand All @@ -74,15 +80,18 @@ type BindHandle struct {

// pendingVerifyBindRecordMap indicates the pending verify bind records that found during query.
pendingVerifyBindRecordMap tmpBindRecordMap

lastUpdateTime types.Time

parser4Baseline *parser.Parser
}

// Lease influences the duration of loading bind info and handling invalid bind.
var Lease = 3 * time.Second

const (
// OwnerKey is the bindinfo owner path that is saved to etcd.
OwnerKey = "/tidb/bindinfo/owner"
// Prompt is the prompt for bindinfo owner manager.
Prompt = "bindinfo"
)

type bindRecordUpdate struct {
bindRecord *BindRecord
updateTime time.Time
Expand All @@ -94,7 +103,6 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
handle.sctx.Context = ctx
handle.bindInfo.Value.Store(make(cache, 32))
handle.bindInfo.parser = parser.New()
handle.parser4Baseline = parser.New()
handle.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
// We do not need the first two parameters because they are only use to generate hint,
Expand All @@ -112,10 +120,16 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {

// Update updates the global sql bind cache.
func (h *BindHandle) Update(fullLoad bool) (err error) {
h.bindInfo.Lock()
lastUpdateTime := h.bindInfo.lastUpdateTime
h.bindInfo.Unlock()

sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info"
if !fullLoad {
sql += " where update_time >= \"" + h.lastUpdateTime.String() + "\""
sql += " where update_time >= \"" + lastUpdateTime.String() + "\""
}
// We need to apply the updates by order, wrong apply order of same original sql may cause inconsistent state.
sql += " order by update_time"

// No need to acquire the session context lock for ExecRestrictedSQL, it
// uses another background session.
Expand All @@ -128,24 +142,25 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
h.bindInfo.Lock()
newCache := h.bindInfo.Value.Load().(cache).copy()
defer func() {
h.bindInfo.lastUpdateTime = lastUpdateTime
h.bindInfo.Value.Store(newCache)
h.bindInfo.Unlock()
}()

for _, row := range rows {
hash, meta, err := h.newBindRecord(row)
// Update lastUpdateTime to the newest one.
if meta.Bindings[0].UpdateTime.Compare(h.lastUpdateTime) > 0 {
h.lastUpdateTime = meta.Bindings[0].UpdateTime
if meta.Bindings[0].UpdateTime.Compare(lastUpdateTime) > 0 {
lastUpdateTime = meta.Bindings[0].UpdateTime
}
if err != nil {
logutil.BgLogger().Error("update bindinfo failed", zap.Error(err))
continue
}

oldRecord := newCache.getBindRecord(hash, meta.OriginalSQL, meta.Db)
newRecord := merge(oldRecord, meta)
if meta.HasUsingBinding() {
newRecord := merge(oldRecord, meta).removeDeletedBindings()
if len(newRecord.Bindings) > 0 {
newCache.setBindRecord(hash, newRecord)
} else {
newCache.removeDeletedBindRecord(hash, oldRecord)
Expand All @@ -167,7 +182,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
if br != nil {
binding := br.FindBinding(record.Bindings[0].id)
if binding != nil {
// There is already a binding with status `Using` or `PendingVerify`, we could directly cancel the job.
// There is already a binding with status `Using`, `PendingVerify` or `Rejected`, we could directly cancel the job.
if record.Bindings[0].Status == PendingVerify {
return nil
}
Expand Down Expand Up @@ -460,7 +475,9 @@ func (c cache) setBindRecord(hash string, meta *BindRecord) {
func (c cache) copy() cache {
newCache := make(cache, len(c))
for k, v := range c {
newCache[k] = v
bindRecords := make([]*BindRecord, len(v))
copy(bindRecords, v)
newCache[k] = bindRecords
}
return newCache
}
Expand Down Expand Up @@ -528,9 +545,10 @@ var GenHintsFromSQL func(ctx context.Context, sctx sessionctx.Context, node ast.

// CaptureBaselines is used to automatically capture plan baselines.
func (h *BindHandle) CaptureBaselines() {
parser4Capture := parser.New()
schemas, sqls := stmtsummary.StmtSummaryByDigestMap.GetMoreThanOnceSelect()
for i := range sqls {
stmt, err := h.parser4Baseline.ParseOneStmt(sqls[i], "", "")
stmt, err := parser4Capture.ParseOneStmt(sqls[i], "", "")
if err != nil {
logutil.BgLogger().Debug("parse SQL failed", zap.String("SQL", sqls[i]), zap.Error(err))
continue
Expand Down Expand Up @@ -586,9 +604,171 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
h.pendingVerifyBindRecordMap.flushToStore()
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')",
variable.TiDBEvolvePlanTaskMaxTime, variable.TiDBEvolvePlanTaskStartTime, variable.TiDBEvolvePlanTaskEndTime)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
maxTime, startTimeStr, endTimeStr := int64(variable.DefTiDBEvolvePlanTaskMaxTime), variable.DefTiDBEvolvePlanTaskStartTime, variable.DefAutoAnalyzeEndTime
for _, row := range rows {
switch row.GetString(0) {
case variable.TiDBEvolvePlanTaskMaxTime:
maxTime, err = strconv.ParseInt(row.GetString(1), 10, 64)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
case variable.TiDBEvolvePlanTaskStartTime:
startTimeStr = row.GetString(1)
case variable.TiDBEvolvePlanTaskEndTime:
endTimeStr = row.GetString(1)
}
}
startTime, err := time.ParseInLocation(variable.FullDayTimeFormat, startTimeStr, time.UTC)
if err != nil {
return 0, time.Time{}, time.Time{}, err

}
endTime, err := time.ParseInLocation(variable.FullDayTimeFormat, endTimeStr, time.UTC)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
return time.Duration(maxTime) * time.Second, startTime, endTime, nil
}

const (
// acceptFactor is the factor to decide should we accept the pending verified plan.
// A pending verified plan will be accepted if it performs at least `acceptFactor` times better than the accepted plans.
acceptFactor = 1.5
// nextVerifyDuration is the duration that we will retry the rejected plans.
nextVerifyDuration = 7 * 24 * time.Hour
)

func (h *BindHandle) getOnePendingVerifyJob() (string, string, Binding) {
cache := h.bindInfo.Value.Load().(cache)
for _, bindRecords := range cache {
for _, bindRecord := range bindRecords {
for _, bind := range bindRecord.Bindings {
if bind.Status == PendingVerify {
return bindRecord.OriginalSQL, bindRecord.Db, bind
}
if bind.Status != Rejected {
continue
}
updateTime, err := bind.UpdateTime.Time.GoTime(time.UTC)
// Should not happen.
if err != nil {
continue
}
// Rejected and retry it now.
if time.Since(updateTime) > nextVerifyDuration {
return bindRecord.OriginalSQL, bindRecord.Db, bind
}
}
}
}
return "", "", Binding{}
}

func (h *BindHandle) getRunningDuration(sctx sessionctx.Context, db, sql string, maxTime time.Duration) (time.Duration, error) {
ctx := context.TODO()
if db != "" {
_, err := sctx.(sqlexec.SQLExecutor).Execute(ctx, fmt.Sprintf("use `%s`", db))
if err != nil {
return 0, err
}
}
ctx, cancelFunc := context.WithCancel(ctx)
timer := time.NewTimer(maxTime)
resultChan := make(chan error)
startTime := time.Now()
go runSQL(ctx, sctx, sql, resultChan)
select {
case err := <-resultChan:
cancelFunc()
if err != nil {
return 0, err
}
return time.Since(startTime), nil
case <-timer.C:
cancelFunc()
}
<-resultChan
return -1, nil
}

func runSQL(ctx context.Context, sctx sessionctx.Context, sql string, resultChan chan<- error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
resultChan <- errors.New(fmt.Sprintf("run sql panicked: %v", string(buf)))
}
}()
recordSets, err := sctx.(sqlexec.SQLExecutor).Execute(ctx, sql)
if err != nil {
terror.Call(recordSets[0].Close)
resultChan <- err
return
}
recordSet := recordSets[0]
chk := recordSets[0].NewChunk()
for {
err = recordSet.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
break
}
}
terror.Call(recordSets[0].Close)
resultChan <- err
return
}

// HandleEvolvePlanTask tries to evolve one plan task.
// It only handle one tasks once because we want each task could use the latest parameters.
func (h *BindHandle) HandleEvolvePlanTask(sctx sessionctx.Context) error {
originalSQL, db, binding := h.getOnePendingVerifyJob()
if originalSQL == "" {
return nil
}
maxTime, startTime, endTime, err := getEvolveParameters(sctx)
if err != nil {
return err
}
if maxTime == 0 || !timeutil.WithinDayTimePeriod(startTime, endTime, time.Now()) {
return nil
}
sctx.GetSessionVars().UsePlanBaselines = true
acceptedPlanTime, err := h.getRunningDuration(sctx, db, binding.BindSQL, maxTime)
// If we just return the error to the caller, this job will be retried again and again and cause endless logs,
// since it is still in the bind record. Now we just drop it and if it is actually retryable,
// we will hope for that we can capture this evolve task again.
if err != nil {
return h.DropBindRecord(sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), &BindRecord{OriginalSQL: originalSQL, Db: db, Bindings: []Binding{binding}})
}
// If the accepted plan timeouts, it is hard to decide the timeout for verify plan.
// Currently we simply mark the verify plan as `using` if it could run successfully within maxTime.
if acceptedPlanTime > 0 {
maxTime = time.Duration(float64(acceptedPlanTime) / acceptFactor)
}
sctx.GetSessionVars().UsePlanBaselines = false
verifyPlanTime, err := h.getRunningDuration(sctx, db, binding.BindSQL, maxTime)
if err != nil {
return h.DropBindRecord(sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), &BindRecord{OriginalSQL: originalSQL, Db: db, Bindings: []Binding{binding}})
}
if verifyPlanTime < 0 {
binding.Status = Rejected
} else {
binding.Status = Using
}
return h.AddBindRecord(sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), &BindRecord{OriginalSQL: originalSQL, Db: db, Bindings: []Binding{binding}})
}

// Clear resets the bind handle. It is used for test.
func (h *BindHandle) Clear() {
h.bindInfo.Store(make(cache))
h.invalidBindRecordMap.Store(make(map[string]*bindRecordUpdate))
h.lastUpdateTime = types.ZeroTimestamp
h.bindInfo.lastUpdateTime = types.ZeroTimestamp
}
3 changes: 0 additions & 3 deletions cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ var (
"gc",
"select:0_10000:10",
}, "|"), "jobs to run")
sslCA = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs.")
sslCert = flag.String("cert", "", "path of file that contains X509 certificate in PEM format.")
sslKey = flag.String("key", "", "path of file that contains X509 key in PEM format.")
)

func main() {
Expand Down
Loading

0 comments on commit 9080410

Please sign in to comment.