From dd09a4ac1b2594d5c86ad7a95c3f751a72b8bf63 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 20 Dec 2021 23:27:46 +0800 Subject: [PATCH] store/tikv: abort optimistic transaction when prewrite encounters a lock with larger TS (#29776) ref tikv/tikv#11148 --- store/tikv/lock_test.go | 37 +++++++++++++++++++++++++++++++++++++ store/tikv/prewrite.go | 10 ++++++++++ 2 files changed, 47 insertions(+) diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 11bd6b1049e4b..bf8c7e6d13cb0 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -671,3 +671,40 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) { _, err = t3.Get(context.Background(), []byte("fb2")) errMsgMustContain(c, err, "key not exist") } + +func (s *testLockSuite) TestPrewriteEncountersLargerTsLock(c *C) { + t1, err := s.store.Begin() + c.Assert(err, IsNil) + t1.Set([]byte("k1"), []byte("v1")) + t1.Set([]byte("k2"), []byte("v2")) + + // t2 has larger TS. Let t2 prewrite only the secondary lock. + t2, err := s.store.Begin() + c.Assert(err, IsNil) + t2.Set([]byte("k1"), []byte("v1")) + t2.Set([]byte("k2"), []byte("v2")) + committer, err := newTwoPhaseCommitterWithInit(t2, 1) + c.Assert(err, IsNil) + committer.lockTTL = 20000 // set TTL to 20s + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCRequestBatchSizeLimit", "return"), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/store/tikv/twoPCRequestBatchSizeLimit") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/prewritePrimary", "pause"), IsNil) + ch := make(chan struct{}) + go func() { + err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations) + c.Assert(err, IsNil) + ch <- struct{}{} + }() + time.Sleep(200 * time.Millisecond) // make prewrite earlier than t1 commits + + // Set 1 second timeout. If we still need to wait until t2 expires, we will get a timeout error + // instead of write conflict. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + err = t1.Commit(ctx) + c.Assert(kv.ErrWriteConflict.Equal(err), IsTrue) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/prewritePrimary"), IsNil) + <-ch +} diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index e4e55f04d75f6..af025c084b6f8 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" @@ -257,7 +258,16 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff } logutil.BgLogger().Info("prewrite encounters lock", zap.Uint64("session", c.sessionID), + zap.Uint64("txnID", c.startTS), zap.Stringer("lock", lock)) + // If an optimistic transaction encounters a lock with larger TS, this transaction will certainly + // fail due to a WriteConflict error. So we can construct and return an error here early. + // Pessimistic transactions don't need such an optimization. If this key needs a pessimistic lock, + // TiKV will return a PessimisticLockNotFound error directly if it encounters a different lock. Otherwise, + // TiKV returns lock.TTL = 0, and we still need to resolve the lock. + if lock.TxnID > c.startTS && !c.isPessimistic { + return kv.ErrWriteConflict.GenWithStackByArgs(c.startTS, lock.TxnID, 0, lock.Key) + } locks = append(locks, lock) } start := time.Now()