From 9263cf2567eff3a38a6275e967ad40345844e0b8 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 9 Oct 2019 18:48:34 +0800 Subject: [PATCH 1/3] store/tikv: implement a `ttlManager` to update the TTL of a transaction (#12177) --- metrics/metrics.go | 2 + metrics/tikvclient.go | 17 +++++ session/pessimistic_test.go | 7 +- store/mockstore/mocktikv/mvcc_leveldb.go | 1 - store/tikv/2pc.go | 92 +++++++++++++++++++++++- store/tikv/2pc_test.go | 27 ++++++- store/tikv/txn.go | 2 + 7 files changed, 139 insertions(+), 9 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index a09390ab9e33c..87cf9e725e67b 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -151,5 +151,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVRangeTaskStats) prometheus.MustRegister(TiKVRangeTaskPushDuration) prometheus.MustRegister(HandleSchemaValidate) + prometheus.MustRegister(TiKVTokenWaitDuration) + prometheus.MustRegister(TiKVTxnHeartBeatHistogram) prometheus.MustRegister(GRPCConnTransientFailureCounter) } diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index ce6be01ce596d..8b9cb514338b1 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -223,4 +223,21 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), Help: "duration to push sub tasks to range task workers", }, []string{LblType}) + TiKVTokenWaitDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "batch_executor_token_wait_duration", + Buckets: prometheus.ExponentialBuckets(1, 2, 30), // 1ns ~ 1s + Help: "tidb txn token wait duration to process batches", + }) + + TiKVTxnHeartBeatHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "txn_heart_beat", + Help: "Bucketed histogram of the txn_heartbeat request duration.", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 18), // 1ms ~ 292s + }, []string{LblType}) ) diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 3dbe8aa7566f7..98abee8433e2d 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -365,14 +365,13 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) { syncCh <- struct{}{} tk.MustQuery("select c from conflict where id = 1").Check(testkit.Rows("3")) - // Check outdated pessimistic lock is resolved. + // Check pessimistic lock is not resolved. tk.MustExec("begin pessimistic") tk.MustExec("update conflict set c = 4 where id = 1") - time.Sleep(300 * time.Millisecond) tk2.MustExec("begin optimistic") tk2.MustExec("update conflict set c = 5 where id = 1") - tk2.MustExec("commit") - _, err := tk.Exec("commit") + // TODO: ResolveLock block until timeout, takes about 40s, makes CI slow! + _, err := tk2.Exec("commit") c.Check(err, NotNil) // Update snapshotTS after a conflict, invalidate snapshot cache. diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 8753ad3033dda..3a30ba637d627 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -986,7 +986,6 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { } // If current transaction's lock exists. if ok && dec.lock.startTS == startTS { - // If the lock has already outdated, clean up it. if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 1af2cbd45865a..3064f6fe68417 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/execdetails" @@ -52,11 +53,13 @@ const ( var ( tikvSecondaryLockCleanupFailureCounterCommit = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit") tikvSecondaryLockCleanupFailureCounterRollback = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback") + tiKVTxnHeartBeatHistogramOK = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("ok") + tiKVTxnHeartBeatHistogramError = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("err") ) // Global variable set by config file. var ( - PessimisticLockTTL uint64 + PessimisticLockTTL uint64 = 15000 // 15s ~ 40s ) func (ca twoPhaseCommitAction) String() string { @@ -112,6 +115,8 @@ type twoPhaseCommitter struct { isFirstLock bool // regionTxnSize stores the number of keys involved in each region regionTxnSize map[uint64]int + // Used by pessimistic transaction and large transaction. + ttlManager } type mutationEx struct { @@ -128,6 +133,9 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro startTS: txn.StartTS(), connID: connID, regionTxnSize: map[uint64]int{}, + ttlManager: ttlManager{ + ch: make(chan struct{}), + }, }, nil } @@ -586,6 +594,10 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } keyErrs := prewriteResp.GetErrors() if len(keyErrs) == 0 { + isPrimary := bytes.Equal(batch.keys[0], c.primary()) + if isPrimary && c.isPessimistic { + c.ttlManager.run(c) + } return nil } var locks []*Lock @@ -628,6 +640,80 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } } +type ttlManagerState uint32 + +const ( + stateUninitialized ttlManagerState = iota + stateRunning + stateClosed +) + +type ttlManager struct { + state ttlManagerState + ch chan struct{} +} + +func (tm *ttlManager) run(c *twoPhaseCommitter) { + // Run only once. + if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) { + return + } + go tm.keepAlive(c) +} + +func (tm *ttlManager) close() { + if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateRunning), uint32(stateClosed)) { + return + } + close(tm.ch) +} + +func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { + // Ticker is set to 1/3 of the PessimisticLockTTL. + ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 3) + defer ticker.Stop() + for { + select { + case <-tm.ch: + return + case <-ticker.C: + bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff) + now, err := c.store.GetOracle().GetTimestamp(bo.ctx) + if err != nil { + err1 := bo.Backoff(BoPDRPC, err) + if err1 != nil { + logutil.BgLogger().Warn("keepAlive get tso fail", + zap.Error(err)) + return + } + continue + } + + uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS)) + const c10min = 10 * 60 * 1000 + if uptime > c10min { + // Set a 10min maximum lifetime for the ttlManager, so when something goes wrong + // the key will not be locked forever. + logutil.BgLogger().Info("ttlManager live up to its lifetime", + zap.Uint64("txnStartTS", c.startTS)) + return + } + + newTTL := uptime + PessimisticLockTTL + startTime := time.Now() + _, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL) + if err != nil { + tiKVTxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) + logutil.BgLogger().Warn("send TxnHeartBeat failed", + zap.Error(err), + zap.Uint64("txnStartTS", c.startTS)) + return + } + tiKVTxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds()) + } + } +} + func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) for i, k := range batch.keys { @@ -680,6 +766,10 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc } keyErrs := lockResp.GetErrors() if len(keyErrs) == 0 { + isPrimary := bytes.Equal(batch.keys[0], c.primary()) + if isPrimary { // No need to check isPessimistic because this function is only called in that case. + c.ttlManager.run(c) + } return nil } var locks []*Lock diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index a92f33fdff9fd..05b932ff4562c 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) @@ -539,9 +540,29 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lockInfo := s.getLockInfo(c, key) elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL c.Assert(elapsedTTL, GreaterEqual, uint64(100)) - c.Assert(elapsedTTL, Less, uint64(200)) - lockInfo2 := s.getLockInfo(c, key2) - c.Assert(lockInfo2.LockTtl, Equals, lockInfo.LockTtl) + + lr := newLockResolver(s.store) + bo := NewBackoffer(context.Background(), getMaxBackoff) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, lockInfo.LockTtl) + + // Check primary lock TTL is auto increasing while the pessimistic txn is ongoing. + for i := 0; i < 50; i++ { + lockInfoNew := s.getLockInfo(c, key) + if lockInfoNew.LockTtl > lockInfo.LockTtl { + currentTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx) + c.Assert(err, IsNil) + // Check that the TTL is update to a reasonable range. + expire := oracle.ExtractPhysical(txn.startTS) + int64(lockInfoNew.LockTtl) + now := oracle.ExtractPhysical(currentTS) + c.Assert(expire > now, IsTrue) + c.Assert(uint64(expire-now) <= PessimisticLockTTL, IsTrue) + return + } + time.Sleep(100 * time.Millisecond) + } + c.Assert(false, IsTrue, Commentf("update pessimistic ttl fail")) } func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo { diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 7d671c0be6495..f6ecdc9b2213a 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -273,6 +273,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { return errors.Trace(err) } } + defer committer.ttlManager.close() if err := committer.initKeysAndMutations(); err != nil { return errors.Trace(err) } @@ -331,6 +332,7 @@ func (txn *tikvTxn) Rollback() error { // Clean up pessimistic lock. if txn.IsPessimistic() && txn.committer != nil { err := txn.rollbackPessimisticLocks() + txn.committer.ttlManager.close() if err != nil { logutil.Logger(context.Background()).Error(err.Error()) } From 8ca9de53988096c8a50a2d54181c33b5be92a931 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Thu, 26 Sep 2019 10:36:47 +0800 Subject: [PATCH 2/3] store/tikv: fix ttl manager race (#12398) --- store/tikv/2pc.go | 14 +++----------- store/tikv/txn.go | 3 +++ 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 3064f6fe68417..88cd7a8722272 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -594,10 +594,6 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } keyErrs := prewriteResp.GetErrors() if len(keyErrs) == 0 { - isPrimary := bytes.Equal(batch.keys[0], c.primary()) - if isPrimary && c.isPessimistic { - c.ttlManager.run(c) - } return nil } var locks []*Lock @@ -682,7 +678,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { if err != nil { err1 := bo.Backoff(BoPDRPC, err) if err1 != nil { - logutil.BgLogger().Warn("keepAlive get tso fail", + logutil.Logger(context.Background()).Warn("keepAlive get tso fail", zap.Error(err)) return } @@ -694,7 +690,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { if uptime > c10min { // Set a 10min maximum lifetime for the ttlManager, so when something goes wrong // the key will not be locked forever. - logutil.BgLogger().Info("ttlManager live up to its lifetime", + logutil.Logger(context.Background()).Info("ttlManager live up to its lifetime", zap.Uint64("txnStartTS", c.startTS)) return } @@ -704,7 +700,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { _, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL) if err != nil { tiKVTxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) - logutil.BgLogger().Warn("send TxnHeartBeat failed", + logutil.Logger(context.Background()).Warn("send TxnHeartBeat failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) return @@ -766,10 +762,6 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc } keyErrs := lockResp.GetErrors() if len(keyErrs) == 0 { - isPrimary := bytes.Equal(batch.keys[0], c.primary()) - if isPrimary { // No need to check isPessimistic because this function is only called in that case. - c.ttlManager.run(c) - } return nil } var locks []*Lock diff --git a/store/tikv/txn.go b/store/tikv/txn.go index f6ecdc9b2213a..01a32795437d0 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -414,6 +414,9 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput } return err } + if assignedPrimaryKey { + txn.committer.ttlManager.run(txn.committer) + } } txn.mu.Lock() txn.lockKeys = append(txn.lockKeys, keys...) From 48664bf24ab1101747e7ac54b5cd9cdfcb8b288e Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 11 Oct 2019 21:36:04 +0800 Subject: [PATCH 3/3] address comment --- metrics/metrics.go | 1 - metrics/tikvclient.go | 8 -------- 2 files changed, 9 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 87cf9e725e67b..60efda0a1df28 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -151,7 +151,6 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVRangeTaskStats) prometheus.MustRegister(TiKVRangeTaskPushDuration) prometheus.MustRegister(HandleSchemaValidate) - prometheus.MustRegister(TiKVTokenWaitDuration) prometheus.MustRegister(TiKVTxnHeartBeatHistogram) prometheus.MustRegister(GRPCConnTransientFailureCounter) } diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index 8b9cb514338b1..f1e52ddde43ab 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -223,14 +223,6 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), Help: "duration to push sub tasks to range task workers", }, []string{LblType}) - TiKVTokenWaitDuration = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "tidb", - Subsystem: "tikvclient", - Name: "batch_executor_token_wait_duration", - Buckets: prometheus.ExponentialBuckets(1, 2, 30), // 1ns ~ 1s - Help: "tidb txn token wait duration to process batches", - }) TiKVTxnHeartBeatHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{