Skip to content

Commit

Permalink
store/tikv: abort optimistic transaction when prewrite encounters a l…
Browse files Browse the repository at this point in the history
…ock with larger TS (#29776)

ref tikv/tikv#11148
  • Loading branch information
sticnarf authored Dec 20, 2021
1 parent ff8d96a commit dd09a4a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
37 changes: 37 additions & 0 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,3 +671,40 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) {
_, err = t3.Get(context.Background(), []byte("fb2"))
errMsgMustContain(c, err, "key not exist")
}

func (s *testLockSuite) TestPrewriteEncountersLargerTsLock(c *C) {
t1, err := s.store.Begin()
c.Assert(err, IsNil)
t1.Set([]byte("k1"), []byte("v1"))
t1.Set([]byte("k2"), []byte("v2"))

// t2 has larger TS. Let t2 prewrite only the secondary lock.
t2, err := s.store.Begin()
c.Assert(err, IsNil)
t2.Set([]byte("k1"), []byte("v1"))
t2.Set([]byte("k2"), []byte("v2"))
committer, err := newTwoPhaseCommitterWithInit(t2, 1)
c.Assert(err, IsNil)
committer.lockTTL = 20000 // set TTL to 20s

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCRequestBatchSizeLimit", "return"), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/store/tikv/twoPCRequestBatchSizeLimit")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/prewritePrimary", "pause"), IsNil)
ch := make(chan struct{})
go func() {
err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations)
c.Assert(err, IsNil)
ch <- struct{}{}
}()
time.Sleep(200 * time.Millisecond) // make prewrite earlier than t1 commits

// Set 1 second timeout. If we still need to wait until t2 expires, we will get a timeout error
// instead of write conflict.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err = t1.Commit(ctx)
c.Assert(kv.ErrWriteConflict.Equal(err), IsTrue)

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/prewritePrimary"), IsNil)
<-ch
}
10 changes: 10 additions & 0 deletions store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
Expand Down Expand Up @@ -257,7 +258,16 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
}
logutil.BgLogger().Info("prewrite encounters lock",
zap.Uint64("session", c.sessionID),
zap.Uint64("txnID", c.startTS),
zap.Stringer("lock", lock))
// If an optimistic transaction encounters a lock with larger TS, this transaction will certainly
// fail due to a WriteConflict error. So we can construct and return an error here early.
// Pessimistic transactions don't need such an optimization. If this key needs a pessimistic lock,
// TiKV will return a PessimisticLockNotFound error directly if it encounters a different lock. Otherwise,
// TiKV returns lock.TTL = 0, and we still need to resolve the lock.
if lock.TxnID > c.startTS && !c.isPessimistic {
return kv.ErrWriteConflict.GenWithStackByArgs(c.startTS, lock.TxnID, 0, lock.Key)
}
locks = append(locks, lock)
}
start := time.Now()
Expand Down

0 comments on commit dd09a4a

Please sign in to comment.