Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into clustered-index-s…
Browse files Browse the repository at this point in the history
…yntax
  • Loading branch information
tangenta committed Feb 5, 2021
2 parents 437e55c + 75f7485 commit e532176
Show file tree
Hide file tree
Showing 274 changed files with 9,482 additions and 108,790 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ explain_test
cmd/explaintest/explain-test.out
cmd/explaintest/explaintest_tidb-server
cmd/explaintest/portgenerator
cmd/explaintest/s/
*.fail.go
tools/bin/
vendor
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ gotest: failpoint-enable
ifeq ("$(TRAVIS_COVERAGE)", "1")
@echo "Running in TRAVIS_COVERAGE mode."
$(GO) get github.com/go-playground/overalls
@export log_level=error; \
@export log_level=info; \
$(OVERALLS) -project=github.com/pingcap/tidb \
-covermode=count \
-ignore='.git,vendor,cmd,docs,tests,LICENSES' \
Expand All @@ -132,7 +132,7 @@ ifeq ("$(TRAVIS_COVERAGE)", "1")
|| { $(FAILPOINT_DISABLE); exit 1; }
else
@echo "Running in native mode."
@export log_level=fatal; export TZ='Asia/Shanghai'; \
@export log_level=info; export TZ='Asia/Shanghai'; \
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES) -check.p true -check.timeout 4s || { $(FAILPOINT_DISABLE); exit 1; }
endif
@$(FAILPOINT_DISABLE)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- [**Slack Channel**](https://slack.tidb.io/invite?team=tidb-community&channel=everyone&ref=pingcap-tidb)
- **Twitter**: [@PingCAP](https://twitter.com/PingCAP)
- [**Reddit**](https://www.reddit.com/r/TiDB/)
- **Mailing list**: [Google Group](https://groups.google.com/forum/#!forum/tidb-user)
- **Mailing list**: [lists.tidb.io](https://lists.tidb.io/g/main/subgroups)
- [**For support, please contact PingCAP**](http://bit.ly/contact_us_via_github)

## What is TiDB?
Expand Down
159 changes: 80 additions & 79 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/parser/format"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -123,17 +122,22 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
func (h *BindHandle) Update(fullLoad bool) (err error) {
h.bindInfo.Lock()
lastUpdateTime := h.bindInfo.lastUpdateTime
updateTime := lastUpdateTime.String()
if fullLoad {
updateTime = "0000-00-00 00:00:00"
}

sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source from mysql.bind_info"
if !fullLoad {
sql += " where update_time > \"" + lastUpdateTime.String() + "\""
exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time`, updateTime)
if err != nil {
return err
}
// We need to apply the updates by order, wrong apply order of same original sql may cause inconsistent state.
sql += " order by update_time"

// No need to acquire the session context lock for ExecRestrictedSQL, it
// No need to acquire the session context lock for ExecRestrictedStmt, it
// uses another background session.
rows, _, err := h.sctx.Context.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
rows, _, err := exec.ExecRestrictedStmt(context.Background(), stmt)

if err != nil {
h.bindInfo.Unlock()
return err
Expand Down Expand Up @@ -215,7 +219,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
return err
}
// Binding recreation should physically delete previous bindings.
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, ""))
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %?`, record.OriginalSQL)
if err != nil {
return err
}
Expand All @@ -227,7 +231,17 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +303,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
return err
}
if duplicateBinding != nil {
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding.BindSQL))
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %? AND bind_sql = %?`, record.OriginalSQL, duplicateBinding.BindSQL)
if err != nil {
return err
}
Expand All @@ -305,7 +319,17 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?, %?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -349,17 +373,19 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return
return err
}

updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()

bindSQL := ""
if binding != nil {
bindSQL = binding.BindSQL
if binding == nil {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
deleted, updateTs, originalSQL, updateTs)
} else {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %?`,
deleted, updateTs, originalSQL, updateTs, binding.BindSQL)
}

_, err = exec.ExecuteInternal(context.TODO(), h.logicalDeleteBindInfoSQL(originalSQL, db, updateTs, bindSQL))
deleteRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows())
return err
}
Expand All @@ -377,6 +403,15 @@ func (h *BindHandle) lockBindInfoTable() error {
return err
}

// LockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
func (h *BindHandle) LockBindInfoSQL() string {
sql, err := sqlexec.EscapeSQL("UPDATE mysql.bind_info SET source= %? WHERE original_sql= %?", Builtin, BuiltinPseudoSQL4BindLock)
if err != nil {
return ""
}
return sql
}

// tmpBindRecordMap is used to temporarily save bind record changes.
// Those changes will be flushed into store periodically.
type tmpBindRecordMap struct {
Expand Down Expand Up @@ -575,51 +610,6 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {
return nil
}

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
sql := fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s`,
expression.Quote(normdOrigSQL),
)
if bindSQL == "" {
return sql
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindSQL))
}

func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Binding) string {
return fmt.Sprintf(`INSERT INTO mysql.bind_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)`,
expression.Quote(orignalSQL),
expression.Quote(info.BindSQL),
expression.Quote(db),
expression.Quote(info.Status),
expression.Quote(info.CreateTime.String()),
expression.Quote(info.UpdateTime.String()),
expression.Quote(info.Charset),
expression.Quote(info.Collation),
expression.Quote(info.Source),
)
}

// LockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
func (h *BindHandle) LockBindInfoSQL() string {
return fmt.Sprintf("UPDATE mysql.bind_info SET source=%s WHERE original_sql=%s",
expression.Quote(Builtin),
expression.Quote(BuiltinPseudoSQL4BindLock))
}

func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQL string) string {
updateTsStr := updateTs.String()
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and update_time<%s`,
expression.Quote(deleted),
expression.Quote(updateTsStr),
expression.Quote(originalSQL),
expression.Quote(updateTsStr))
if bindingSQL == "" {
return sql
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindingSQL))
}

// CaptureBaselines is used to automatically capture plan baselines.
func (h *BindHandle) CaptureBaselines() {
parser4Capture := parser.New()
Expand Down Expand Up @@ -661,16 +651,20 @@ func (h *BindHandle) CaptureBaselines() {
func getHintsForSQL(sctx sessionctx.Context, sql string) (string, error) {
origVals := sctx.GetSessionVars().UsePlanBaselines
sctx.GetSessionVars().UsePlanBaselines = false
recordSets, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), fmt.Sprintf("explain format='hint' %s", sql))

// Usually passing a sprintf to ExecuteInternal is not recommended, but in this case
// it is safe because ExecuteInternal does not permit MultiStatement execution. Thus,
// the statement won't be able to "break out" from EXPLAIN.
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), fmt.Sprintf("EXPLAIN FORMAT='hint' %s", sql))
sctx.GetSessionVars().UsePlanBaselines = origVals
if len(recordSets) > 0 {
defer terror.Log(recordSets[0].Close())
if rs != nil {
defer terror.Call(rs.Close)
}
if err != nil {
return "", err
}
chk := recordSets[0].NewChunk()
err = recordSets[0].Next(context.TODO(), chk)
chk := rs.NewChunk()
err = rs.Next(context.TODO(), chk)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -766,9 +760,17 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')",
variable.TiDBEvolvePlanTaskMaxTime, variable.TiDBEvolvePlanTaskStartTime, variable.TiDBEvolvePlanTaskEndTime)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(
context.TODO(),
"SELECT variable_name, variable_value FROM mysql.global_variables WHERE variable_name IN (%?, %?, %?)",
variable.TiDBEvolvePlanTaskMaxTime,
variable.TiDBEvolvePlanTaskStartTime,
variable.TiDBEvolvePlanTaskEndTime,
)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
Expand Down Expand Up @@ -839,7 +841,7 @@ func (h *BindHandle) getOnePendingVerifyJob() (string, string, Binding) {
func (h *BindHandle) getRunningDuration(sctx sessionctx.Context, db, sql string, maxTime time.Duration) (time.Duration, error) {
ctx := context.TODO()
if db != "" {
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, fmt.Sprintf("use `%s`", db))
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "use %n", db)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -873,23 +875,22 @@ func runSQL(ctx context.Context, sctx sessionctx.Context, sql string, resultChan
resultChan <- fmt.Errorf("run sql panicked: %v", string(buf))
}
}()
recordSets, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
if err != nil {
if len(recordSets) > 0 {
terror.Call(recordSets[0].Close)
if rs != nil {
terror.Call(rs.Close)
}
resultChan <- err
return
}
recordSet := recordSets[0]
chk := recordSets[0].NewChunk()
chk := rs.NewChunk()
for {
err = recordSet.Next(ctx, chk)
err = rs.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
break
}
}
terror.Call(recordSets[0].Close)
terror.Call(rs.Close)
resultChan <- err
}

Expand Down
Loading

0 comments on commit e532176

Please sign in to comment.