Skip to content

Commit

Permalink
transaction: fix union select for update race (#19006) (#19022)
Browse files Browse the repository at this point in the history
* transaction: fix LockKeys race

* do not update delta for lock keys

* fix more race

* fix another race

Co-authored-by: ti-srebot <[email protected]>

Co-authored-by: Evan Zhou <[email protected]>
  • Loading branch information
ti-srebot and coocood authored Aug 6, 2020
1 parent ebcd49f commit 1a91fc5
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 24 deletions.
9 changes: 1 addition & 8 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,13 +921,6 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
lockWaitTime = kv.LockNoWait
}

if len(e.keys) > 0 {
// This operation is only for schema validator check.
for id := range e.tblID2Handle {
e.ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{})
}
}

return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...)
}

Expand All @@ -952,7 +945,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx {
func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error {
sctx := se.GetSessionVars().StmtCtx
if !sctx.InUpdateStmt && !sctx.InDeleteStmt {
se.GetSessionVars().TxnCtx.ForUpdate = true
atomic.StoreUint32(&se.GetSessionVars().TxnCtx.ForUpdate, 1)
}
// Lock keys only once when finished fetching all results.
txn, err := se.Txn(true)
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ type LockCtx struct {
LockWaitTime int64
WaitStartTime time.Time
PessimisticLockWaited *int32
LockKeysDuration *time.Duration
LockKeysDuration *int64
LockKeysCount *int32
ReturnValues bool
Values map[string]ReturnedValue
Expand Down
12 changes: 12 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,3 +1535,15 @@ func (s *testPessimisticSuite) TestInsertDupKeyAfterLockBatchPointGet(c *C) {
err = tk.ExecToErr("commit")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
}

func (s *testPessimisticSuite) TestPessimisticUnionForUpdate(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int, v int, k int, primary key (id), key kk(k))")
tk.MustExec("insert into t select 1, 1, 1")
tk.MustExec("begin pessimistic")
tk.MustQuery("(select * from t where id between 0 and 1 for update) union all (select * from t where id between 0 and 1 for update)")
tk.MustExec("update t set k = 2 where k = 1")
tk.MustExec("commit")
tk.MustExec("admin check table t")
}
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {

connID := s.sessionVars.ConnectionID
s.sessionVars.RetryInfo.Retrying = true
if s.sessionVars.TxnCtx.ForUpdate {
if atomic.LoadUint32(&s.sessionVars.TxnCtx.ForUpdate) == 1 {
err = ErrForUpdateCantRetry.GenWithStackByArgs(connID)
return err
}
Expand Down
17 changes: 9 additions & 8 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ type StatementContext struct {
planNormalized string
planDigest string
Tables []TableEntry
PointExec bool // for point update cached execution, Constant expression need to set "paramMarker"
lockWaitStartTime *time.Time // LockWaitStartTime stores the pessimistic lock wait start time
PointExec bool // for point update cached execution, Constant expression need to set "paramMarker"
lockWaitStartTime int64 // LockWaitStartTime stores the pessimistic lock wait start time
PessimisticLockWaited int32
LockKeysDuration time.Duration
LockKeysDuration int64
LockKeysCount int32
TblInfo2UnionScan map[*model.TableInfo]bool
TaskID uint64 // unique ID for an execution of a statement
Expand Down Expand Up @@ -488,7 +488,7 @@ func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails {
var details execdetails.ExecDetails
sc.mu.Lock()
details = sc.mu.execDetails
details.LockKeysDuration = sc.LockKeysDuration
details.LockKeysDuration = time.Duration(atomic.LoadInt64(&sc.LockKeysDuration))
sc.mu.Unlock()
return details
}
Expand Down Expand Up @@ -629,11 +629,12 @@ func (sc *StatementContext) SetFlagsFromPBFlag(flags uint64) {

// GetLockWaitStartTime returns the statement pessimistic lock wait start time
func (sc *StatementContext) GetLockWaitStartTime() time.Time {
if sc.lockWaitStartTime == nil {
curTime := time.Now()
sc.lockWaitStartTime = &curTime
startTime := atomic.LoadInt64(&sc.lockWaitStartTime)
if startTime == 0 {
startTime = time.Now().UnixNano()
atomic.StoreInt64(&sc.lockWaitStartTime, startTime)
}
return *sc.lockWaitStartTime
return time.Unix(0, startTime)
}

//CopTasksDetails collects some useful information of cop-tasks during execution.
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ type TransactionContext struct {
// CreateTime For metrics.
CreateTime time.Time
StatementCount int
ForUpdate bool
CouldRetry bool
IsPessimistic bool
Isolation string
LockExpire uint32
ForUpdate uint32
}

// AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock.
Expand Down
8 changes: 3 additions & 5 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error {

// lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock
func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput ...kv.Key) error {
txn.mu.Lock()
defer txn.mu.Unlock()
// Exclude keys that are already locked.
var err error
keys := make([][]byte, 0, len(keysInput))
Expand All @@ -352,7 +354,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
if lockCtx.PessimisticLockWaited != nil {
if atomic.LoadInt32(lockCtx.PessimisticLockWaited) > 0 {
timeWaited := time.Since(lockCtx.WaitStartTime)
*lockCtx.LockKeysDuration = timeWaited
atomic.StoreInt64(lockCtx.LockKeysDuration, int64(timeWaited))
metrics.TiKVPessimisticLockKeysDuration.Observe(timeWaited.Seconds())
}
}
Expand All @@ -361,7 +363,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
*lockCtx.LockKeysCount += int32(len(keys))
}
}()
txn.mu.Lock()
for _, key := range keysInput {
// The value of lockedMap is only used by pessimistic transactions.
valueExist, locked := txn.lockedMap[string(key)]
Expand All @@ -385,7 +386,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
lockCtx.Values[string(key)] = kv.ReturnedValue{AlreadyLocked: true}
}
}
txn.mu.Unlock()
if len(keys) == 0 {
return nil
}
Expand Down Expand Up @@ -451,7 +451,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
txn.committer.ttlManager.run(txn.committer, lockCtx)
}
}
txn.mu.Lock()
txn.lockKeys = append(txn.lockKeys, keys...)
for _, key := range keys {
// PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exists.
Expand All @@ -465,7 +464,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
}
}
txn.dirty = true
txn.mu.Unlock()
return nil
}

Expand Down

0 comments on commit 1a91fc5

Please sign in to comment.