Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: refactor ts acquisition within build and execute phases #35376

Merged
merged 49 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
cc05adf
update
SpadeA-Tang Jun 13, 2022
0f136bd
update
SpadeA-Tang Jun 13, 2022
9ad4492
update
SpadeA-Tang Jun 14, 2022
9a68f75
update
SpadeA-Tang Jun 14, 2022
ddd7ed4
update
SpadeA-Tang Jun 14, 2022
142f55e
update
SpadeA-Tang Jun 14, 2022
ebb581f
update
SpadeA-Tang Jun 15, 2022
b595d3d
update
SpadeA-Tang Jun 15, 2022
673d389
update
SpadeA-Tang Jun 15, 2022
0b431c2
update
SpadeA-Tang Jun 15, 2022
516530b
update
SpadeA-Tang Jun 15, 2022
4c832a2
update
SpadeA-Tang Jun 15, 2022
5e7fc56
checkpoint
SpadeA-Tang Jun 16, 2022
781ce8f
update
SpadeA-Tang Jun 16, 2022
b54c368
save
SpadeA-Tang Jun 16, 2022
139e506
update
SpadeA-Tang Jun 18, 2022
cff38ac
update
SpadeA-Tang Jun 18, 2022
b3dd7ee
update
SpadeA-Tang Jun 18, 2022
3c7d1d3
delete unrelated test
SpadeA-Tang Jun 18, 2022
0b70f0e
modify some comments
SpadeA-Tang Jun 19, 2022
9a500e9
update
SpadeA-Tang Jun 20, 2022
b88d5ab
update
SpadeA-Tang Jun 20, 2022
7b40adc
update
SpadeA-Tang Jun 20, 2022
4906089
update
SpadeA-Tang Jun 20, 2022
bd12553
update
SpadeA-Tang Jun 20, 2022
350efbb
update
SpadeA-Tang Jun 20, 2022
2f28146
update
SpadeA-Tang Jun 20, 2022
4e52ee6
fmt
SpadeA-Tang Jun 20, 2022
4700b48
Merge branch 'master' into removeTxnForUpdateTS
lcwangchao Jun 20, 2022
1c5a1e9
update optimizeForNotFetchingLatestTS
SpadeA-Tang Jun 21, 2022
aa9c357
Merge branch 'master' into removeTxnForUpdateTS
SpadeA-Tang Jun 21, 2022
be1c611
update
SpadeA-Tang Jun 22, 2022
83c5fa9
remove unused code
SpadeA-Tang Jun 22, 2022
1da2557
update
SpadeA-Tang Jun 23, 2022
251a875
update
SpadeA-Tang Jun 23, 2022
fe66d11
update
SpadeA-Tang Jun 23, 2022
3cfb2fa
udpate
SpadeA-Tang Jun 24, 2022
035a2bd
Merge branch 'master' into removeTxnForUpdateTS
SpadeA-Tang Jun 24, 2022
0d3e267
update
SpadeA-Tang Jun 24, 2022
d83319b
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
3be9075
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
f519b42
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
1ea7192
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
168f950
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
2eccd2e
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
303dbd7
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
26d11e3
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 27, 2022
2d4caf4
update
SpadeA-Tang Jun 27, 2022
1092f63
Merge branch 'removeTxnForUpdateTS' of github.com:SpadeA-Tang/tidb in…
SpadeA-Tang Jun 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan, startTs)
if err := exec.Init(pointGetPlan, true, false); err != nil {
return nil, err
}
a.PsStmt.Executor = exec
}
}
Expand Down Expand Up @@ -708,7 +710,10 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
keys = filterTemporaryTableKeys(sctx.GetSessionVars(), keys)
seVars := sctx.GetSessionVars()
keys = filterLockTableKeys(seVars.StmtCtx, keys)
lockCtx := newLockCtx(seVars, seVars.LockWaitTimeout, len(keys))
lockCtx, err := newLockCtx(sctx, seVars.LockWaitTimeout, len(keys))
if err != nil {
return err
}
var lockKeyStats *util.LockKeysDetails
ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats)
startLocking := time.Now()
Expand Down Expand Up @@ -767,6 +772,13 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
if lockErr == nil {
return nil, nil
}
failpoint.Inject("assertPessimisticLockErr", func() {
if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
sessiontxn.AddEntrance(a.Ctx, "insertWriteConflict")
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
} else if terror.ErrorEqual(kv.ErrKeyExists, lockErr) {
sessiontxn.AddEntrance(a.Ctx, "insertDuplicateKey")
}
})

defer func() {
if _, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok {
Expand Down
14 changes: 7 additions & 7 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ func (e *BatchPointGetExec) Open(context.Context) error {
sessVars := e.ctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
stmtCtx := sessVars.StmtCtx
if e.lock {
e.snapshotTS = txnCtx.GetForUpdateTS()
}
txn, err := e.ctx.Txn(false)
if err != nil {
return err
Expand Down Expand Up @@ -540,13 +537,16 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
}

// LockKeys locks the keys for pessimistic transaction.
func LockKeys(ctx context.Context, seCtx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
txnCtx := seCtx.GetSessionVars().TxnCtx
lctx := newLockCtx(seCtx.GetSessionVars(), lockWaitTime, len(keys))
func LockKeys(ctx context.Context, sctx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
txnCtx := sctx.GetSessionVars().TxnCtx
lctx, err := newLockCtx(sctx, lockWaitTime, len(keys))
if err != nil {
return err
}
if txnCtx.IsPessimistic {
lctx.InitReturnValues(len(keys))
}
err := doLockKeys(ctx, seCtx, lctx, keys...)
err = doLockKeys(ctx, sctx, lctx, keys...)
if err != nil {
return err
}
Expand Down
72 changes: 15 additions & 57 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,6 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
defer func() { b.inSelectLockStmt = false }()
}
b.hasLock = true
if b.err = b.updateForUpdateTSIfNeeded(v.Children()[0]); b.err != nil {
return nil
}
// Build 'select for update' using the 'for update' ts.
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()

Expand Down Expand Up @@ -865,13 +862,6 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor {

func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
b.inInsertStmt = true
if v.SelectPlan != nil {
// Try to update the forUpdateTS for insert/replace into select statements.
// Set the selectPlan parameter to nil to make it always update the forUpdateTS.
if b.err = b.updateForUpdateTSIfNeeded(nil); b.err != nil {
return nil
}
}
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
selectExec := b.build(v.SelectPlan)
if b.err != nil {
Expand Down Expand Up @@ -2114,9 +2104,6 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
}
}
}
if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil {
return nil
}
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
selExec := b.build(v.SelectPlan)
if b.err != nil {
Expand Down Expand Up @@ -2171,9 +2158,6 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
for _, info := range v.TblColPosInfos {
tblID2table[info.TblID], _ = b.is.TableByID(info.TblID)
}
if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil {
return nil
}
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

selExec := b.build(v.SelectPlan)
if b.err != nil {
Expand All @@ -2190,38 +2174,6 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
return deleteExec
}

// updateForUpdateTSIfNeeded updates the ForUpdateTS for a pessimistic transaction if needed.
// PointGet executor will get conflict error if the ForUpdateTS is older than the latest commitTS,
// so we don't need to update now for better latency.
func (b *executorBuilder) updateForUpdateTSIfNeeded(selectPlan plannercore.PhysicalPlan) error {
txnCtx := b.ctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return nil
}
if _, ok := selectPlan.(*plannercore.PointGetPlan); ok {
return nil
}
// Activate the invalid txn, use the txn startTS as newForUpdateTS
txn, err := b.ctx.Txn(false)
if err != nil {
return err
}
if !txn.Valid() {
_, err := b.ctx.Txn(true)
if err != nil {
return err
}
return nil
}
// The Repeatable Read transaction use Read Committed level to read data for writing (insert, update, delete, select for update),
// We should always update/refresh the for-update-ts no matter the isolation level is RR or RC.
if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
_, err = sessiontxn.GetTxnManager(b.ctx).GetStmtForUpdateTS()
return err
}
return UpdateForUpdateTS(b.ctx, 0)
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string) *analyzeTask {
job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze index " + task.IndexInfo.Name.O}
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
Expand Down Expand Up @@ -4615,18 +4567,12 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
return nil
}

startTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
return nil
}
decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
startTS: startTS,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
keepOrder: plan.KeepOrder,
Expand All @@ -4639,18 +4585,30 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
partTblID: plan.PartTblID,
columns: plan.Columns,
}
if plan.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(plan.TblInfo, startTS)
}

if plan.TblInfo.TempTableType != model.TempTableNone {
// Temporary table should not do any lock operations
e.lock = false
e.waitTime = 0
}

txnManager := sessiontxn.GetTxnManager(b.ctx)
var err error
if e.lock {
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
b.hasLock = true
e.startTS, err = txnManager.GetStmtForUpdateTS()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better rename startTS => snapshotTS because it is not the start ts of the transaction.

} else {
e.startTS, err = txnManager.GetStmtReadTS()
}
if err != nil {
b.err = err
return nil
}

if plan.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(plan.TblInfo, e.startTS)
}

var capacity int
if plan.IndexInfo != nil && !isCommonHandleRead(plan.TblInfo, plan.IndexInfo) {
e.idxVals = plan.IndexValues
Expand Down
19 changes: 14 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -1042,12 +1043,20 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
for id := range e.tblID2Handle {
e.updateDeltaForTableID(id)
}

return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime, len(e.keys)), e.keys...)
lockCtx, err := newLockCtx(e.ctx, lockWaitTime, len(e.keys))
if err != nil {
return err
}
return doLockKeys(ctx, e.ctx, lockCtx, e.keys...)
}

func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64, numKeys int) *tikvstore.LockCtx {
lockCtx := tikvstore.NewLockCtx(seVars.TxnCtx.GetForUpdateTS(), lockWaitTime, seVars.StmtCtx.GetLockWaitStartTime())
func newLockCtx(sctx sessionctx.Context, lockWaitTime int64, numKeys int) (*tikvstore.LockCtx, error) {
seVars := sctx.GetSessionVars()
forUpdateTs, err := sessiontxn.GetTxnManager(sctx).GetStmtForUpdateTS()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
forUpdateTs, err := sessiontxn.GetTxnManager(sctx).GetStmtForUpdateTS()
forUpdateTS, err := sessiontxn.GetTxnManager(sctx).GetStmtForUpdateTS()

if err != nil {
return nil, err
}
lockCtx := tikvstore.NewLockCtx(forUpdateTs, lockWaitTime, seVars.StmtCtx.GetLockWaitStartTime())
lockCtx.Killed = &seVars.Killed
lockCtx.PessimisticLockWaited = &seVars.StmtCtx.PessimisticLockWaited
lockCtx.LockKeysDuration = &seVars.StmtCtx.LockKeysDuration
Expand Down Expand Up @@ -1082,7 +1091,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64, numKeys int) *
if lockCtx.ForUpdateTS > 0 && seVars.AssertionLevel != variable.AssertionLevelOff {
lockCtx.InitCheckExistence(numKeys)
}
return lockCtx
return lockCtx, nil
}

// doLockKeys is the main entry for pessimistic lock keys
Expand Down
57 changes: 41 additions & 16 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"context"
"fmt"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -49,26 +51,30 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
return nil
}

startTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
ekexium marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
e := &PointGetExecutor{
baseExecutor: newBaseExecutor(b.ctx, p.Schema(), p.ID()),
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
}

if p.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(p.TblInfo, startTS)
}
e.base().initCap = 1
e.base().maxChunkSize = 1
e.Init(p, startTS)

upperLock := b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt

err := e.Init(p, false, upperLock)
if err != nil {
b.err = err
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
}

if p.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(p.TblInfo, e.startTS)
}

if e.lock {
b.hasLock = true
}

return e
}

Expand Down Expand Up @@ -106,13 +112,12 @@ type PointGetExecutor struct {
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, useMaxTs bool, upperLock bool) error {
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
decoder := NewRowDecoder(e.ctx, p.Schema(), p.TblInfo)
e.tblInfo = p.TblInfo
e.handle = p.Handle
e.idxInfo = p.IndexInfo
e.idxVals = p.IndexValues
e.startTS = startTs
e.done = false
if e.tblInfo.TempTableType == model.TempTableNone {
e.lock = p.Lock
Expand All @@ -122,10 +127,30 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
e.lock = false
e.lockWaitTime = 0
}

if useMaxTs {
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
e.startTS = uint64(math.MaxUint64)
} else {
var startTs uint64
var err error
txnManager := sessiontxn.GetTxnManager(e.ctx)
if e.lock || upperLock {
if startTs, err = txnManager.GetStmtForUpdateTS(); err != nil {
return err
}
} else {
if startTs, err = txnManager.GetStmtReadTS(); err != nil {
return err
}
}
e.startTS = startTs
}

e.rowDecoder = decoder
e.partInfo = p.PartitionInfo
e.columns = p.Columns
e.buildVirtualColumnInfo()
return nil
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand All @@ -143,9 +168,6 @@ func (e *PointGetExecutor) buildVirtualColumnInfo() {
func (e *PointGetExecutor) Open(context.Context) error {
txnCtx := e.ctx.GetSessionVars().TxnCtx
snapshotTS := e.startTS
if e.lock {
snapshotTS = txnCtx.GetForUpdateTS()
}
var err error
e.txn, err = e.ctx.Txn(false)
if err != nil {
Expand Down Expand Up @@ -381,9 +403,12 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro
}
if e.lock {
seVars := e.ctx.GetSessionVars()
lockCtx := newLockCtx(seVars, e.lockWaitTime, 1)
lockCtx, err := newLockCtx(e.ctx, e.lockWaitTime, 1)
if err != nil {
return err
}
lockCtx.InitReturnValues(1)
err := doLockKeys(ctx, e.ctx, lockCtx, key)
err = doLockKeys(ctx, e.ctx, lockCtx, key)
if err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions sessiontxn/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,21 @@ func AssertTxnManagerReadTS(sctx sessionctx.Context, expected uint64) {
panic(fmt.Sprintf("Txn read ts not match, expect:%d, got:%d", expected, actual))
}
}

// AssertLockErr is used to record the lock errors we encountered
// Only for test
var AssertLockErr stringutil.StringerStr = "assertLockError"

// AddEntrance is used only for test
func AddEntrance(sctx sessionctx.Context, name string) {
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
records, ok := sctx.Value(AssertLockErr).(map[string]int)
if !ok {
records = make(map[string]int)
sctx.SetValue(AssertLockErr, records)
}
if v, ok := records[name]; ok {
records[name] = v + 1
} else {
records[name] = 1
}
}
Loading