From 9a75e50d633e45cfde68ec911e31275a5b81b18b Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 8 Mar 2021 13:40:54 +0800 Subject: [PATCH] store/tikv: use original snapshot timestamp to resolve locks (#23044) --- store/tikv/2pc.go | 2 +- store/tikv/snapshot.go | 6 ++++- store/tikv/snapshot_fail_test.go | 45 ++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 20435d2776a4b..08a3b9992930c 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -1126,7 +1126,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } } } - c.commitTS = commitTS + atomic.StoreUint64(&c.commitTS, commitTS) if c.store.oracle.IsExpired(c.startTS, kv.MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) { err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d", diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 2585835d8d88c..15cc1f96a860b 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -481,6 +481,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte return nil, errors.Trace(err) } + snapVer := s.version.Ver if s.version == kv.MaxVersion { newTS, err := tsFuture.Wait() if err != nil { @@ -494,7 +495,10 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte } } - msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, []*Lock{lock}) + // Use the original snapshot version to resolve locks so we can use MaxUint64 + // as the callerStartTS if it's an auto-commit point get. This could save us + // one write at TiKV by not pushing forward the minCommitTS. + msBeforeExpired, err := cli.ResolveLocks(bo, snapVer, []*Lock{lock}) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/snapshot_fail_test.go b/store/tikv/snapshot_fail_test.go index 7600293786fef..472d65ed0a5a6 100644 --- a/store/tikv/snapshot_fail_test.go +++ b/store/tikv/snapshot_fail_test.go @@ -15,6 +15,8 @@ package tikv import ( "context" + "sync/atomic" + "time" . "github.com/pingcap/check" "github.com/pingcap/failpoint" @@ -153,6 +155,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) { err = txn.Set([]byte("k4"), []byte("v4")) c.Assert(err, IsNil) txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetOption(kv.Enable1PC, false) txn.SetOption(kv.GuaranteeLinearizability, false) // Prewrite an async-commit lock and do not commit it. c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", `return`), IsNil) @@ -169,3 +172,45 @@ func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil) } + +func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { + defer s.cleanup(c) + + txn, err := s.store.Begin() + c.Assert(err, IsNil) + err = txn.Set([]byte("k1"), []byte("v1")) + err = txn.Set([]byte("k2"), []byte("v2")) + c.Assert(err, IsNil) + txn.SetOption(kv.EnableAsyncCommit, false) + txn.SetOption(kv.Enable1PC, false) + txn.SetOption(kv.GuaranteeLinearizability, false) + + // Prewrite the lock without committing it + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeCommit", `pause`), IsNil) + ch := make(chan struct{}) + committer, err := newTwoPhaseCommitterWithInit(txn, 1) + c.Assert(committer.primary(), DeepEquals, []byte("k1")) + go func() { + c.Assert(err, IsNil) + err = committer.execute(context.Background()) + c.Assert(err, IsNil) + ch <- struct{}{} + }() + + // Wait until prewrite finishes + time.Sleep(200 * time.Millisecond) + // Should get nothing with max version, and **not pushing forward minCommitTS** of the primary lock + snapshot := s.store.GetSnapshot(kv.MaxVersion) + _, err = snapshot.Get(context.Background(), []byte("k2")) + c.Assert(err, ErrorMatches, ".*key not exist") + + initialCommitTS := atomic.LoadUint64(&committer.commitTS) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommit"), IsNil) + + <-ch + // check the minCommitTS is not pushed forward + snapshot = s.store.GetSnapshot(kv.Version{Ver: initialCommitTS}) + v, err := snapshot.Get(context.Background(), []byte("k2")) + c.Assert(err, IsNil) + c.Assert(v, DeepEquals, []byte("v2")) +}