From 8b3b01e8d770bbe918ce64d010103f3ad66f28e0 Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 2 Feb 2024 20:13:23 +0800 Subject: [PATCH] introduce a random jitter to region cache ttl (#1148) * introduce a random jitter to region cache ttl Signed-off-by: zyguan * refactor searching cached region Signed-off-by: zyguan * observe load region by reason Signed-off-by: zyguan * address the comment Signed-off-by: zyguan --------- Signed-off-by: zyguan --- internal/locate/region_cache.go | 218 +++++++++++++------ internal/locate/region_cache_test.go | 34 +-- internal/locate/region_request3_test.go | 33 +-- internal/locate/region_request_state_test.go | 14 +- internal/locate/region_request_test.go | 17 +- internal/locate/sorted_btree.go | 16 +- metrics/metrics.go | 11 + tikv/region.go | 8 +- 8 files changed, 233 insertions(+), 118 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 3b802f4525..e8f652e00f 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -56,6 +56,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" @@ -81,9 +82,9 @@ import ( ) const ( - btreeDegree = 32 - invalidatedLastAccessTime = -1 - defaultRegionsPerBatch = 128 + btreeDegree = 32 + expiredTTL = -1 + defaultRegionsPerBatch = 128 ) // LabelFilter returns false means label doesn't match, and will ignore this store. @@ -121,6 +122,29 @@ func SetRegionCacheTTLSec(t int64) { regionCacheTTLSec = t } +// regionCacheTTLJitterSec is the max jitter time for region cache TTL. +var regionCacheTTLJitterSec int64 = 60 + +// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter). +func SetRegionCacheTTLWithJitter(base int64, jitter int64) { + regionCacheTTLSec = base + regionCacheTTLJitterSec = jitter +} + +// nextTTL returns a random TTL in range [ts+base, ts+base+jitter). The input ts should be an epoch timestamp in seconds. +func nextTTL(ts int64) int64 { + jitter := int64(0) + if regionCacheTTLJitterSec > 0 { + jitter = rand.Int63n(regionCacheTTLJitterSec) + } + return ts + regionCacheTTLSec + jitter +} + +// nextTTLWithoutJitter is used for test. +func nextTTLWithoutJitter(ts int64) int64 { + return ts + regionCacheTTLSec +} + const ( needReloadOnAccess int32 = 1 << iota // indicates the region will be reloaded on next access needExpireAfterTTL // indicates the region will expire after RegionCacheTTL (even when it's accessed continuously) @@ -150,11 +174,30 @@ const ( Other ) +func (r InvalidReason) String() string { + switch r { + case Ok: + return "Ok" + case Other: + return "Other" + case EpochNotMatch: + return "EpochNotMatch" + case RegionNotFound: + return "RegionNotFound" + case StoreNotFound: + return "StoreNotFound" + case NoLeader: + return "NoLeader" + default: + return "Unknown" + } +} + // Region presents kv region type Region struct { meta *metapb.Region // raw region meta from PD, immutable after init store unsafe.Pointer // point to region store info, see RegionStore - lastAccess int64 // last region access time, see checkRegionCacheTTL + ttl int64 // region TTL in epoch seconds, see checkRegionCacheTTL syncFlags int32 // region need be sync later, see needReloadOnAccess, needExpireAfterTTL invalidReason InvalidReason // the reason why the region is invalidated } @@ -338,7 +381,7 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio } // mark region has been init accessed. - r.lastAccess = time.Now().Unix() + r.ttl = nextTTL(time.Now().Unix()) return r, nil } @@ -356,8 +399,7 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool { } func (r *Region) isCacheTTLExpired(ts int64) bool { - lastAccess := atomic.LoadInt64(&r.lastAccess) - return ts-lastAccess > regionCacheTTLSec + return ts > atomic.LoadInt64(&r.ttl) } // checkRegionCacheTTL returns false means the region cache is expired. @@ -366,28 +408,36 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool { if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil { r.invalidate(Other) } + newTTL := int64(0) for { - lastAccess := atomic.LoadInt64(&r.lastAccess) - if ts-lastAccess > regionCacheTTLSec { + ttl := atomic.LoadInt64(&r.ttl) + if ts > ttl { return false } - if r.checkSyncFlags(needExpireAfterTTL) || atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) { + // skip updating TTL when: + // 1. the region has been marked as `needExpireAfterTTL` + // 2. the TTL is far away from ts (still within jitter time) + if r.checkSyncFlags(needExpireAfterTTL) || ttl > ts+regionCacheTTLSec { + return true + } + if newTTL == 0 { + newTTL = nextTTL(ts) + } + // now we have ts <= ttl <= ts+regionCacheTTLSec <= newTTL = ts+regionCacheTTLSec+randomJitter + if atomic.CompareAndSwapInt64(&r.ttl, ttl, newTTL) { return true } } } // invalidate invalidates a region, next time it will got null result. -func (r *Region) invalidate(reason InvalidReason) { - metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc() - atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason)) - atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime) -} - -// invalidateWithoutMetrics invalidates a region without metrics, next time it will got null result. -func (r *Region) invalidateWithoutMetrics(reason InvalidReason) { - atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason)) - atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime) +func (r *Region) invalidate(reason InvalidReason, nocount ...bool) { + if atomic.CompareAndSwapInt32((*int32)(&r.invalidReason), int32(Ok), int32(reason)) { + if len(nocount) == 0 || !nocount[0] { + metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc() + } + atomic.StoreInt64(&r.ttl, expiredTTL) + } } func (r *Region) getSyncFlags() int32 { @@ -1068,9 +1118,15 @@ func (c *RegionCache) LocateEndKey(bo *retry.Backoffer, key []byte) (*KeyLocatio } func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey bool) (r *Region, err error) { - r = c.searchCachedRegion(key, isEndKey) - if r == nil { + var expired bool + r, expired = c.searchCachedRegionByKey(key, isEndKey) + tag := "ByKey" + if isEndKey { + tag = "ByEndKey" + } + if r == nil || expired { // load region when it is not exists or expired. + observeLoadRegion(tag, r, expired, 0) lr, err := c.loadRegion(bo, key, isEndKey, pd.WithAllowFollowerHandle()) if err != nil { // no region data, return error if failure. @@ -1083,6 +1139,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey c.mu.Unlock() // just retry once, it won't bring much overhead. if stale { + observeLoadRegion(tag+":Retry", r, expired, 0) lr, err = c.loadRegion(bo, key, isEndKey) if err != nil { // no region data, return error if failure. @@ -1101,8 +1158,10 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey err error ) if reloadOnAccess { + observeLoadRegion(tag, r, expired, flags) lr, err = c.loadRegion(bo, key, isEndKey) } else { + observeLoadRegion("ByID", r, expired, flags) lr, err = c.loadRegionByID(bo, r.GetID()) } if err != nil { @@ -1122,8 +1181,9 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey } func (c *RegionCache) tryFindRegionByKey(key []byte, isEndKey bool) (r *Region) { - r = c.searchCachedRegion(key, isEndKey) - if r == nil || r.checkSyncFlags(needReloadOnAccess) { + var expired bool + r, expired = c.searchCachedRegionByKey(key, isEndKey) + if r == nil || expired || r.checkSyncFlags(needReloadOnAccess) { return nil } return r @@ -1242,12 +1302,11 @@ func (c *RegionCache) OnSendFail(bo *retry.Backoffer, ctx *RPCContext, scheduleR // LocateRegionByID searches for the region with ID. func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*KeyLocation, error) { - c.mu.RLock() - r := c.getRegionByIDFromCache(regionID) - c.mu.RUnlock() - if r != nil { - if flags := r.resetSyncFlags(needReloadOnAccess); flags > 0 { + r, expired := c.searchCachedRegionByID(regionID) + if r != nil && !expired { + if flags := r.resetSyncFlags(needReloadOnAccess | needDelayedReloadReady); flags > 0 { reloadOnAccess := flags&needReloadOnAccess > 0 + observeLoadRegion("ByID", r, expired, flags) lr, err := c.loadRegionByID(bo, regionID) if err != nil { // ignore error and use old region info. @@ -1269,6 +1328,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K return loc, nil } + observeLoadRegion("ByID", r, expired, 0) r, err := c.loadRegionByID(bo, regionID) if err != nil { return nil, err @@ -1500,11 +1560,7 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld // If the old region is still valid, do not invalidate it to avoid unnecessary backoff. if invalidateOldRegion { // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. - if shouldCount { - region.cachedRegion.invalidate(Other) - } else { - region.cachedRegion.invalidateWithoutMetrics(Other) - } + region.cachedRegion.invalidate(Other, !shouldCount) } } // update related vars. @@ -1513,47 +1569,33 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld return true } -// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, -// it should be called with c.mu.RLock(), and the returned Region should not be -// used after c.mu is RUnlock(). -// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful -// when processing in reverse order. -func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region { - ts := time.Now().Unix() - var r *Region +// searchCachedRegionByKey finds the region from cache by key. +func (c *RegionCache) searchCachedRegionByKey(key []byte, isEndKey bool) (*Region, bool) { c.mu.RLock() - r = c.mu.sorted.DescendLessOrEqual(key, isEndKey, ts) + region := c.mu.sorted.SearchByKey(key, isEndKey) c.mu.RUnlock() - if r != nil && (!isEndKey && r.Contains(key) || isEndKey && r.ContainsByEnd(key)) { - return r + if region == nil { + return nil, false } - return nil + return region, !region.checkRegionCacheTTL(time.Now().Unix()) } -// getRegionByIDFromCache tries to get region by regionID from cache. Like -// `getCachedRegion`, it should be called with c.mu.RLock(), and the returned -// Region should not be used after c.mu is RUnlock(). -func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { - ts := time.Now().Unix() +// searchCachedRegionByID finds the region from cache by id. +func (c *RegionCache) searchCachedRegionByID(regionID uint64) (*Region, bool) { + c.mu.RLock() ver, ok := c.mu.latestVersions[regionID] if !ok { - return nil + c.mu.RUnlock() + return nil, false } - latestRegion, ok := c.mu.regions[ver] + region, ok := c.mu.regions[ver] + c.mu.RUnlock() if !ok { // should not happen - logutil.BgLogger().Warn("region version not found", - zap.Uint64("regionID", regionID), zap.Stringer("version", &ver)) - return nil - } - lastAccess := atomic.LoadInt64(&latestRegion.lastAccess) - if ts-lastAccess > regionCacheTTLSec { - return nil - } - if !latestRegion.checkSyncFlags(needExpireAfterTTL) { - atomic.CompareAndSwapInt64(&latestRegion.lastAccess, atomic.LoadInt64(&latestRegion.lastAccess), ts) + logutil.BgLogger().Warn("region not found", zap.Uint64("id", regionID), zap.Stringer("ver", &ver)) + return nil, false } - return latestRegion + return region, !region.checkRegionCacheTTL(time.Now().Unix()) } // GetStoresByType gets stores by type `typ` @@ -1570,6 +1612,53 @@ func (c *RegionCache) GetAllStores() []*Store { }) } +var loadRegionCounters sync.Map + +const ( + loadRegionReasonMissing = "Missing" + loadRegionReasonExpiredNormal = "Expired:Normal" + loadRegionReasonExpiredFrozen = "Expired:Frozen" + loadRegionReasonExpiredInvalid = "Expired:Invalid:" + loadRegionReasonReloadOnAccess = "Reload:OnAccess" + loadRegionReasonReloadDelayed = "Reload:Delayed" + loadRegionReasonUpdateBuckets = "UpdateBuckets" + loadRegionReasonUnknown = "Unknown" +) + +func observeLoadRegion(tag string, region *Region, expired bool, reloadFlags int32, explicitReason ...string) { + reason := loadRegionReasonUnknown + if len(explicitReason) > 0 { + reason = strings.Join(explicitReason, ":") + } else if region == nil { + reason = loadRegionReasonMissing + } else if expired { + invalidReason := InvalidReason(atomic.LoadInt32((*int32)(®ion.invalidReason))) + if invalidReason != Ok { + reason = loadRegionReasonExpiredInvalid + invalidReason.String() + } else if region.checkSyncFlags(needExpireAfterTTL) { + reason = loadRegionReasonExpiredFrozen + } else { + reason = loadRegionReasonExpiredNormal + } + } else if reloadFlags > 0 { + if reloadFlags&needReloadOnAccess > 0 { + reason = loadRegionReasonReloadOnAccess + } else if reloadFlags&needDelayedReloadReady > 0 { + reason = loadRegionReasonReloadDelayed + } + } + type key struct { + t string + r string + } + counter, ok := loadRegionCounters.Load(key{tag, reason}) + if !ok { + counter = metrics.TiKVLoadRegionCounter.WithLabelValues(tag, reason) + loadRegionCounters.Store(key{tag, reason}, counter) + } + counter.(prometheus.Counter).Inc() +} + // loadRegion loads region from pd client, and picks the first peer as leader. // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful // when processing in reverse order. @@ -2062,6 +2151,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV // TODO(youjiali1995): use singleflight. go func() { bo := retry.NewBackoffer(context.Background(), 20000) + observeLoadRegion("ByID", r, false, 0, loadRegionReasonUpdateBuckets) new, err := c.loadRegionByID(bo, regionID.id) if err != nil { logutil.Logger(bo.GetCtx()).Error("failed to update buckets", diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 2f09239d3c..2d6d9aab3c 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -149,7 +149,8 @@ func validRegions(regions map[RegionVerID]*Region, ts int64) (len int) { func (s *testRegionCacheSuite) getRegion(key []byte) *Region { _, err := s.cache.LocateKey(s.bo, key) s.Nil(err) - r := s.cache.searchCachedRegion(key, false) + r, expired := s.cache.searchCachedRegionByKey(key, false) + s.False(expired) s.NotNil(r) return r } @@ -157,7 +158,8 @@ func (s *testRegionCacheSuite) getRegion(key []byte) *Region { func (s *testRegionCacheSuite) getRegionWithEndKey(key []byte) *Region { _, err := s.cache.LocateEndKey(s.bo, key) s.Nil(err) - r := s.cache.searchCachedRegion(key, true) + r, expired := s.cache.searchCachedRegionByKey(key, true) + s.False(expired) s.NotNil(r) return r } @@ -211,9 +213,11 @@ func (s *testRegionCacheSuite) TestSimple() { s.checkCache(1) s.Equal(r.GetMeta(), r.meta) s.Equal(r.GetLeaderPeerID(), r.meta.Peers[r.getStore().workTiKVIdx].Id) - s.cache.mu.regions[r.VerID()].lastAccess = 0 - r = s.cache.searchCachedRegion([]byte("a"), true) - s.Nil(r) + s.cache.mu.regions[r.VerID()].ttl = 0 + var expired bool + r, expired = s.cache.searchCachedRegionByKey([]byte("a"), true) + s.True(expired) + s.NotNil(r) } // TestResolveStateTransition verifies store's resolve state transition. For example, @@ -310,8 +314,8 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { } func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() { - s.onClosed = func() { SetRegionCacheTTLSec(600) } - SetRegionCacheTTLSec(2) + s.onClosed = func() { SetRegionCacheTTLWithJitter(600, 60) } + SetRegionCacheTTLWithJitter(2, 0) cntGetRegion := 0 s.cache.pdClient = &inspectedPDClient{ @@ -366,8 +370,8 @@ func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() { } func (s *testRegionCacheSuite) TestTiFlashRecoveredFromDown() { - s.onClosed = func() { SetRegionCacheTTLSec(600) } - SetRegionCacheTTLSec(3) + s.onClosed = func() { SetRegionCacheTTLWithJitter(600, 60) } + SetRegionCacheTTLWithJitter(3, 0) store3 := s.cluster.AllocID() peer3 := s.cluster.AllocID() @@ -1508,7 +1512,7 @@ func BenchmarkOnRequestFail(b *testing.B) { if err != nil { b.Fatal(err) } - region := cache.getRegionByIDFromCache(loc.Region.id) + region, _ := cache.searchCachedRegionByID(loc.Region.id) b.ResetTimer() regionStore := region.getStore() store, peer, accessIdx, _ := region.WorkStorePeer(regionStore) @@ -1597,7 +1601,7 @@ func (s *testRegionCacheSuite) TestBuckets() { fakeRegion := &Region{ meta: cachedRegion.meta, syncFlags: cachedRegion.syncFlags, - lastAccess: cachedRegion.lastAccess, + ttl: cachedRegion.ttl, invalidReason: cachedRegion.invalidReason, } fakeRegion.setStore(cachedRegion.getStore().clone()) @@ -1817,7 +1821,7 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() { now := time.Now().Unix() for verID, r := range s.cache.mu.regions { if verID.id%3 == 0 { - atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10) + atomic.StoreInt64(&r.ttl, now-10) } else { remaining++ } @@ -1837,7 +1841,7 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() { now = time.Now().Unix() for verID, r := range s.cache.mu.regions { if verID.id%3 == 1 { - atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10) + atomic.StoreInt64(&r.ttl, now-10) } else { remaining++ } @@ -1929,7 +1933,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10) s.Equal(len(r), 2) @@ -2006,7 +2010,7 @@ func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() { fakeRegion := &Region{ meta: r.meta, syncFlags: r.syncFlags, - lastAccess: r.lastAccess, + ttl: r.ttl, invalidReason: r.invalidReason, } fakeRegion.setStore(r.getStore().clone()) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index b2562b4858..c5cf930a17 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -356,6 +356,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { s.Nil(ctx.ProxyStore) } +func refreshRegionTTL(region *Region) { + atomic.StoreInt64(®ion.ttl, nextTTLWithoutJitter(time.Now().Unix())) + atomic.StoreInt32((*int32)(®ion.invalidReason), int32(Ok)) +} + func refreshEpochs(regionStore *regionStore) { for i, store := range regionStore.stores { regionStore.storeEpochs[i] = atomic.LoadUint32(&store.epoch) @@ -405,7 +410,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { region = &Region{ meta: region.GetMeta(), } - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) region.meta.Peers = append(region.meta.Peers, tikvLearner) atomic.StorePointer(®ion.store, unsafe.Pointer(regionStore)) @@ -416,7 +421,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { cache.mu.Unlock() // Test accessFollower state with kv.ReplicaReadLearner request type. - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) refreshEpochs(regionStore) req.ReplicaReadType = kv.ReplicaReadLearner replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) @@ -425,7 +430,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { accessLearner, _ := replicaSelector.state.(*accessFollower) // Invalidate the region if the leader is not in the region. - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) // Should switch to the next follower. @@ -456,7 +461,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { region = &Region{ meta: region.GetMeta(), } - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) region.meta.Peers = append(region.meta.Peers, tiflash) atomic.StorePointer(®ion.store, unsafe.Pointer(regionStore)) @@ -520,7 +525,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.False(replicaSelector.region.isValid()) // Test switching to tryFollower if leader is unreachable - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) @@ -680,7 +685,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) // Test accessFollower state filtering epoch-stale stores. - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) refreshEpochs(regionStore) // Mark all followers as stale. tiKVNum := regionStore.accessStoreNum(tiKVOnly) @@ -699,7 +704,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // Test accessFollower state filtering label-not-match stores. - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) refreshEpochs(regionStore) labels := []*metapb.StoreLabel{ { @@ -721,7 +726,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { } // Test accessFollower state with leaderOnly option - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) refreshEpochs(regionStore) for i := 0; i < 5; i++ { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithLeaderOnly()) @@ -734,7 +739,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { } // Test accessFollower state with kv.ReplicaReadMixed request type. - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) refreshEpochs(regionStore) req.ReplicaReadType = kv.ReplicaReadMixed replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) @@ -742,7 +747,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) // Invalidate the region if the leader is not in the region. - atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) + refreshRegionTTL(region) replicaSelector.updateLeader(&metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()}) s.False(region.isValid()) // Don't try next replica if the region is invalidated. @@ -1248,7 +1253,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil }} - region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + region, _ := s.cache.searchCachedRegionByID(regionLoc.Region.GetID()) s.True(region.isValid()) req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) @@ -1480,7 +1485,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg }}}, nil }} - region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + region, _ := s.cache.searchCachedRegionByID(regionLoc.Region.GetID()) s.True(region.isValid()) req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) @@ -1847,8 +1852,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() { } func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { - s.onClosed = func() { SetRegionCacheTTLSec(600) } - SetRegionCacheTTLSec(2) + s.onClosed = func() { SetRegionCacheTTLWithJitter(600, 60) } + SetRegionCacheTTLWithJitter(2, 0) bo := retry.NewBackoffer(context.Background(), -1) key := []byte("key") diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index f4a9881b71..6a92b52767 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -557,9 +557,7 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon s.Nil(err) s.NotNil(regionLoc) - s.cache.mu.RLock() - region := s.cache.getRegionByIDFromCache(s.regionID) - s.cache.mu.RUnlock() + region, _ := s.cache.searchCachedRegionByID(s.regionID) defer func() { var ( valid bool @@ -662,10 +660,9 @@ func followerDown(s *testRegionCacheStaleReadSuite) { } func followerDownAndUp(s *testRegionCacheStaleReadSuite) { - s.cache.mu.RLock() - cachedRegion := s.cache.getRegionByIDFromCache(s.regionID) - s.cache.mu.RUnlock() + cachedRegion, expired := s.cache.searchCachedRegionByID(s.regionID) _, follower := s.getFollower() + s.False(expired) s.NotNil(cachedRegion) s.NotNil(follower) regionStore := cachedRegion.getStore() @@ -751,10 +748,9 @@ func leaderDown(s *testRegionCacheStaleReadSuite) { } func leaderDownAndUp(s *testRegionCacheStaleReadSuite) { - s.cache.mu.RLock() - cachedRegion := s.cache.getRegionByIDFromCache(s.regionID) - s.cache.mu.RUnlock() + cachedRegion, expired := s.cache.searchCachedRegionByID(s.regionID) _, leader := s.getLeader() + s.False(expired) s.NotNil(cachedRegion) s.NotNil(leader) regionStore := cachedRegion.getStore() diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index b79689fb23..72cc1ac4a9 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -277,7 +277,9 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceled( _, _, err = sender.SendReq(bo, req, region.Region, time.Second) // Check this kind of error won't cause region cache drop. s.Equal(errors.Cause(err), context.Canceled) - s.NotNil(sender.regionCache.getRegionByIDFromCache(s.region)) + r, expired := sender.regionCache.searchCachedRegionByID(s.region) + s.False(expired) + s.NotNil(r) } // cancelContextClient wraps rpcClient and always cancels context before sending requests. @@ -547,7 +549,9 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCa cancel() _, _, err = sender.SendReq(bo, req, region.Region, 3*time.Second) s.Equal(errors.Cause(err), context.Canceled) - s.NotNil(s.cache.getRegionByIDFromCache(s.region)) + r, expired := sender.regionCache.searchCachedRegionByID(s.region) + s.False(expired) + s.NotNil(r) // Just for covering error code = codes.Canceled. client1 := &cancelContextClient{ @@ -604,8 +608,9 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { // test kv epochNotMatch return empty regions s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: region.Region, Store: &Store{storeID: s.store}}, []*metapb.Region{}) s.Nil(err) - r := s.cache.getRegionByIDFromCache(s.region) - s.Nil(r) + r, expired := s.cache.searchCachedRegionByID(s.region) + s.True(expired) + s.NotNil(r) // refill cache region, err = s.cache.LocateRegionByID(s.bo, s.region) @@ -616,7 +621,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) @@ -626,7 +631,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v3 := region.Region.confVer + 1 r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}} st = &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true) + s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) diff --git a/internal/locate/sorted_btree.go b/internal/locate/sorted_btree.go index 0b56cc584d..5e2b88536a 100644 --- a/internal/locate/sorted_btree.go +++ b/internal/locate/sorted_btree.go @@ -63,21 +63,19 @@ func (s *SortedRegions) ReplaceOrInsert(cachedRegion *Region) *Region { return nil } -// DescendLessOrEqual returns all items that are less than or equal to the key. -func (s *SortedRegions) DescendLessOrEqual(key []byte, isEndKey bool, ts int64) (r *Region) { +// SearchByKey returns the region which contains the key. Note that the region might be expired and it's caller's duty to check the region TTL. +func (s *SortedRegions) SearchByKey(key []byte, isEndKey bool) (r *Region) { s.b.DescendLessOrEqual(newBtreeSearchItem(key), func(item *btreeItem) bool { - r = item.cachedRegion - if isEndKey && bytes.Equal(r.StartKey(), key) { - r = nil // clear result + region := item.cachedRegion + if isEndKey && bytes.Equal(region.StartKey(), key) { return true // iterate next item } - if !r.checkRegionCacheTTL(ts) { - r = nil - return true + if !isEndKey && region.Contains(key) || isEndKey && region.ContainsByEnd(key) { + r = region } return false }) - return r + return } // AscendGreaterOrEqual returns all items that are greater than or equal to the key. diff --git a/metrics/metrics.go b/metrics/metrics.go index 7a9cfcf367..875a3d44e7 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -58,6 +58,7 @@ var ( TiKVLoadSafepointCounter *prometheus.CounterVec TiKVSecondaryLockCleanupFailureCounter *prometheus.CounterVec TiKVRegionCacheCounter *prometheus.CounterVec + TiKVLoadRegionCounter *prometheus.CounterVec TiKVLoadRegionCacheHistogram *prometheus.HistogramVec TiKVLocalLatchWaitTimeHistogram prometheus.Histogram TiKVStatusDuration *prometheus.HistogramVec @@ -128,6 +129,7 @@ const ( LblInternal = "internal" LblGeneral = "general" LblDirection = "direction" + LblReason = "reason" ) func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { @@ -294,6 +296,14 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblType, LblResult}) + TiKVLoadRegionCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "load_region_total", + Help: "Counter of loading region.", + ConstLabels: constLabels, + }, []string{LblType, LblReason}) + TiKVLoadRegionCacheHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, @@ -763,6 +773,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVLoadSafepointCounter) prometheus.MustRegister(TiKVSecondaryLockCleanupFailureCounter) prometheus.MustRegister(TiKVRegionCacheCounter) + prometheus.MustRegister(TiKVLoadRegionCounter) prometheus.MustRegister(TiKVLoadRegionCacheHistogram) prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram) prometheus.MustRegister(TiKVStatusDuration) diff --git a/tikv/region.go b/tikv/region.go index 6b5e4874d5..7753d4478b 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -194,11 +194,17 @@ func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { return locate.NewRegionRequestRuntimeStats() } -// SetRegionCacheTTLSec sets regionCacheTTLSec to t. +// SetRegionCacheTTLSec sets the base value of region cache TTL. +// Deprecated: use SetRegionCacheTTLWithJitter instead. func SetRegionCacheTTLSec(t int64) { locate.SetRegionCacheTTLSec(t) } +// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter). +func SetRegionCacheTTLWithJitter(base int64, jitter int64) { + locate.SetRegionCacheTTLWithJitter(base, jitter) +} + // SetStoreLivenessTimeout sets storeLivenessTimeout to t. func SetStoreLivenessTimeout(t time.Duration) { locate.SetStoreLivenessTimeout(t)