Skip to content

Commit

Permalink
client: KV requests with source label (#34833)
Browse files Browse the repository at this point in the history
close #33963
  • Loading branch information
you06 authored Jul 1, 2022
1 parent b71a23b commit 65e0b8e
Show file tree
Hide file tree
Showing 119 changed files with 1,346 additions and 577 deletions.
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1941,8 +1941,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:dsMpneacHyuVslSVndgUfJKrXFNG7VPdXip2ulG6glo=",
version = "v0.0.0-20220517085838-12e2f5a9d167",
sum = "h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=",
version = "v0.0.0-20220525022339-6aaebf466305",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -2292,8 +2292,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:N5ivsNkDQDgimY0ZVqMnWqXjEnxy5uFChoB4wPIKpPI=",
version = "v2.0.1-0.20220613112734-be31f33ba03b",
sum = "h1:VAyYcN1Nw7RupQszUYqOkueEVapWSxKFU7uBaYY5Dv8=",
version = "v2.0.1-0.20220627063500-947d923945fd",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/pingcap/tidb/bindinfo",
visibility = ["//visibility:public"],
deps = [
"//kv",
"//metrics",
"//parser",
"//parser/ast",
Expand Down
75 changes: 43 additions & 32 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -134,9 +135,10 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {

exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
// No need to acquire the session context lock for ExecRestrictedSQL, it
// uses another background session.
rows, _, err := exec.ExecRestrictedSQL(context.TODO(), nil, `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
rows, _, err := exec.ExecRestrictedSQL(ctx, nil, `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time, create_time`, updateTime)

if err != nil {
Expand Down Expand Up @@ -209,20 +211,21 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
_, err = exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC")
if err != nil {
return
}

defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
_, err1 := exec.ExecuteInternal(ctx, "ROLLBACK")
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
_, err = exec.ExecuteInternal(ctx, "COMMIT")
if err != nil {
return
}
Expand All @@ -239,7 +242,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)

updateTs := now.String()
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
_, err = exec.ExecuteInternal(ctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
deleted, updateTs, record.OriginalSQL, updateTs)
if err != nil {
return err
Expand All @@ -250,7 +253,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?)`,
_, err = exec.ExecuteInternal(ctx, `INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
Expand Down Expand Up @@ -296,20 +299,21 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
_, err = exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC")
if err != nil {
return
}

defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
_, err1 := exec.ExecuteInternal(ctx, "ROLLBACK")
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
_, err = exec.ExecuteInternal(ctx, "COMMIT")
if err != nil {
return
}
Expand All @@ -322,7 +326,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
return err
}
if duplicateBinding != nil {
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %? AND bind_sql = %?`, record.OriginalSQL, duplicateBinding.BindSQL)
_, err = exec.ExecuteInternal(ctx, `DELETE FROM mysql.bind_info WHERE original_sql = %? AND bind_sql = %?`, record.OriginalSQL, duplicateBinding.BindSQL)
if err != nil {
return err
}
Expand All @@ -338,7 +342,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?, %?, %?, %?, %?, %?, %?, %?, %?)`,
_, err = exec.ExecuteInternal(ctx, `INSERT INTO mysql.bind_info VALUES (%?, %?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
Expand All @@ -365,20 +369,21 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
_, err = exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC")
if err != nil {
return err
}
var deleteRows int
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
_, err1 := exec.ExecuteInternal(ctx, "ROLLBACK")
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
_, err = exec.ExecuteInternal(ctx, "COMMIT")
if err != nil || deleteRows == 0 {
return
}
Expand All @@ -398,10 +403,10 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()

if binding == nil {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND status != %?`,
_, err = exec.ExecuteInternal(ctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND status != %?`,
deleted, updateTs, originalSQL, updateTs, deleted)
} else {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %? and status != %?`,
_, err = exec.ExecuteInternal(ctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %? and status != %?`,
deleted, updateTs, originalSQL, updateTs, binding.BindSQL, deleted)
}

Expand All @@ -417,8 +422,9 @@ func (h *BindHandle) SetBindRecordStatus(originalSQL string, binding *Binding, n
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
_, err = exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC")
if err != nil {
return
}
Expand All @@ -439,12 +445,12 @@ func (h *BindHandle) SetBindRecordStatus(originalSQL string, binding *Binding, n
}
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
_, err1 := exec.ExecuteInternal(ctx, "ROLLBACK")
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
_, err = exec.ExecuteInternal(ctx, "COMMIT")
if err != nil {
return
}
Expand Down Expand Up @@ -485,10 +491,10 @@ func (h *BindHandle) SetBindRecordStatus(originalSQL string, binding *Binding, n
updateTsStr := updateTs.String()

if binding == nil {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND status IN (%?, %?)`,
_, err = exec.ExecuteInternal(ctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND status IN (%?, %?)`,
newStatus, updateTsStr, originalSQL, updateTsStr, oldStatus0, oldStatus1)
} else {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %? AND status IN (%?, %?)`,
_, err = exec.ExecuteInternal(ctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %? AND status IN (%?, %?)`,
newStatus, updateTsStr, originalSQL, updateTsStr, binding.BindSQL, oldStatus0, oldStatus1)
}
affectRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows())
Expand All @@ -504,18 +510,19 @@ func (h *BindHandle) GCBindRecord() (err error) {
h.bindInfo.Unlock()
}()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
_, err = exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC")
if err != nil {
return err
}
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
_, err1 := exec.ExecuteInternal(ctx, "ROLLBACK")
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
_, err = exec.ExecuteInternal(ctx, "COMMIT")
if err != nil {
return
}
Expand All @@ -530,7 +537,7 @@ func (h *BindHandle) GCBindRecord() (err error) {
// we only garbage collect those records with update_time before 10 leases.
updateTime := time.Now().Add(-(10 * Lease))
updateTimeStr := types.NewTime(types.FromGoTime(updateTime), mysql.TypeTimestamp, 3).String()
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE status = 'deleted' and update_time < %?`, updateTimeStr)
_, err = exec.ExecuteInternal(ctx, `DELETE FROM mysql.bind_info WHERE status = 'deleted' and update_time < %?`, updateTimeStr)
return err
}

Expand All @@ -542,8 +549,9 @@ func (h *BindHandle) GCBindRecord() (err error) {
// even if they come from different tidb instances.
func (h *BindHandle) lockBindInfoTable() error {
// h.sctx already locked.
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(context.TODO(), h.LockBindInfoSQL())
_, err := exec.ExecuteInternal(ctx, h.LockBindInfoSQL())
return err
}

Expand Down Expand Up @@ -790,9 +798,10 @@ func (h *BindHandle) extractCaptureFilterFromStorage() (filter *captureFilter) {
users: make(map[string]struct{}),
}
exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
// No need to acquire the session context lock for ExecRestrictedSQL, it
// uses another background session.
rows, _, err := exec.ExecRestrictedSQL(context.TODO(), nil, `SELECT filter_type, filter_value FROM mysql.capture_plan_baselines_blacklist order by filter_type`)
rows, _, err := exec.ExecRestrictedSQL(ctx, nil, `SELECT filter_type, filter_value FROM mysql.capture_plan_baselines_blacklist order by filter_type`)
if err != nil {
logutil.BgLogger().Warn("[sql-bind] failed to load mysql.capture_plan_baselines_blacklist", zap.Error(err))
return
Expand Down Expand Up @@ -898,7 +907,8 @@ func getHintsForSQL(sctx sessionctx.Context, sql string) (string, error) {
// Usually passing a sprintf to ExecuteInternal is not recommended, but in this case
// it is safe because ExecuteInternal does not permit MultiStatement execution. Thus,
// the statement won't be able to "break out" from EXPLAIN.
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), fmt.Sprintf("EXPLAIN FORMAT='hint' %s", sql))
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, fmt.Sprintf("EXPLAIN FORMAT='hint' %s", sql))
sctx.GetSessionVars().UsePlanBaselines = origVals
if rs != nil {
defer func() {
Expand Down Expand Up @@ -1018,9 +1028,10 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
h.pendingVerifyBindRecordMap.flushToStore()
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(
context.TODO(),
func getEvolveParameters(sctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(
ctx,
nil,
"SELECT variable_name, variable_value FROM mysql.global_variables WHERE variable_name IN (%?, %?, %?)",
variable.TiDBEvolvePlanTaskMaxTime,
Expand Down Expand Up @@ -1093,7 +1104,7 @@ func (h *BindHandle) getOnePendingVerifyJob() (string, string, Binding) {
}

func (h *BindHandle) getRunningDuration(sctx sessionctx.Context, db, sql string, maxTime time.Duration) (time.Duration, error) {
ctx := context.TODO()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo)
if db != "" {
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "use %n", db)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
}

func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
rs, err := gs.se.ExecuteInternal(ctx, sql, args...)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,7 @@ func (rc *Client) GenGlobalID(ctx context.Context) (int64, error) {
var id int64
storage := rc.GetDomain().Store()

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
err := kv.RunInNewTxn(
ctx,
storage,
Expand All @@ -1942,6 +1943,7 @@ func (rc *Client) GenGlobalIDs(ctx context.Context, n int) ([]int64, error) {
ids := make([]int64, 0)
storage := rc.GetDomain().Store()

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
err := kv.RunInNewTxn(
ctx,
storage,
Expand All @@ -1961,6 +1963,7 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error {
storage := rc.GetDomain().Store()
var schemaVersion int64

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
if err := kv.RunInNewTxn(
ctx,
storage,
Expand Down
11 changes: 8 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
defer func() {
closeBackfillWorkers(backfillWorkers)
}()
jc := w.jobContext(job)

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey)
Expand Down Expand Up @@ -647,19 +648,19 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba

switch bfWorkerType {
case typeAddIndexWorker:
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo, jc)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
case typeUpdateColumnWorker:
// Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting.
sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true
updateWorker := newUpdateColumnWorker(sessCtx, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo)
updateWorker := newUpdateColumnWorker(sessCtx, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo, jc)
updateWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job)
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo)
idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo, jc)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
Expand Down Expand Up @@ -733,6 +734,8 @@ func iterateSnapshotRows(ctx *JobContext, store kv.Storage, priority int, t tabl
ver := kv.Version{Ver: version}
snap := store.GetSnapshot(ver)
snap.SetOption(kv.Priority, priority)
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
Expand Down Expand Up @@ -778,6 +781,8 @@ func getRangeEndKey(ctx *JobContext, store kv.Storage, priority int, t table.Tab
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
it, err := snap.IterReverse(endKey.Next())
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 65e0b8e

Please sign in to comment.