Skip to content

Commit

Permalink
region_cache: no need to metrics when inserting for scan regions (#1077)
Browse files Browse the repository at this point in the history
* set false when scan region

Signed-off-by: Cabinfever_B <[email protected]>

* set false when scan region

Signed-off-by: Cabinfever_B <[email protected]>

---------

Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB authored Dec 18, 2023
1 parent e80e9ca commit 8deef63
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 26 deletions.
36 changes: 23 additions & 13 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ func (r *Region) invalidate(reason InvalidReason) {
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
}

// invalidateWithoutMetrics invalidates a region without metrics, next time it will got null result.
func (r *Region) invalidateWithoutMetrics(reason InvalidReason) {
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
}

// scheduleReload schedules reload region request in next LocateKey.
func (r *Region) scheduleReload() {
oldValue := atomic.LoadInt32(&r.syncFlag)
Expand Down Expand Up @@ -448,7 +454,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu {
r.latestVersions = make(map[uint64]RegionVerID)
r.sorted = NewSortedRegions(btreeDegree)
for _, region := range rs {
r.insertRegionToCache(region, true)
r.insertRegionToCache(region, true, false)
}
return r
}
Expand Down Expand Up @@ -552,8 +558,8 @@ func (c *RegionCache) clear() {
}

// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
}

// Close releases region cache's resource.
Expand Down Expand Up @@ -1107,7 +1113,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r, true)
c.insertRegionToCache(r, true, true)
c.mu.Unlock()
} else if r.checkNeedReloadAndMarkUpdated() {
// load region when it be marked as need reload.
Expand All @@ -1121,7 +1127,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r, true)
c.insertRegionToCache(r, true, true)
c.mu.Unlock()
}
}
Expand Down Expand Up @@ -1262,7 +1268,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
} else {
r = lr
c.mu.Lock()
c.insertRegionToCache(r, true)
c.insertRegionToCache(r, true, true)
c.mu.Unlock()
}
}
Expand All @@ -1281,7 +1287,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}

c.mu.Lock()
c.insertRegionToCache(r, true)
c.insertRegionToCache(r, true, true)
c.mu.Unlock()
return &KeyLocation{
Region: r.VerID(),
Expand Down Expand Up @@ -1319,7 +1325,7 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
return
}
c.mu.Lock()
c.insertRegionToCache(lr, false)
c.insertRegionToCache(lr, false, false)
c.mu.Unlock()
}

Expand Down Expand Up @@ -1409,7 +1415,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
for _, region := range regions {
c.insertRegionToCache(region, true)
c.insertRegionToCache(region, true, false)
}

return
Expand Down Expand Up @@ -1485,7 +1491,7 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
// It should be protected by c.mu.l.Lock().
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
store := cachedRegion.getStore()
Expand All @@ -1503,7 +1509,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
if shouldCount {
oldRegion.invalidate(Other)
} else {
oldRegion.invalidateWithoutMetrics(Other)
}
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
Expand Down Expand Up @@ -2049,7 +2059,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext

c.mu.Lock()
for _, region := range newRegions {
c.insertRegionToCache(region, true)
c.insertRegionToCache(region, true, true)
}
c.mu.Unlock()

Expand Down Expand Up @@ -2167,7 +2177,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
return
}
c.mu.Lock()
c.insertRegionToCache(new, true)
c.insertRegionToCache(new, true, true)
c.mu.Unlock()
}()
}
Expand Down
18 changes: 9 additions & 9 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
region := createSampleRegion([]byte("k1"), []byte("k2"))
region.meta.Id = 1
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
cache.insertRegionToCache(region, true)
cache.insertRegionToCache(region, true, true)

r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
Expand Down Expand Up @@ -1308,7 +1308,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
filterUnavailablePeers(cpRegion)
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region, true)
s.cache.insertRegionToCache(region, true, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1344,7 +1344,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() {
cpRegion := &pd.Region{Meta: cpMeta}
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region, true)
s.cache.insertRegionToCache(region, true, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1517,12 +1517,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
fakeRegion.setStore(cachedRegion.getStore().clone())
// no buckets
fakeRegion.getStore().buckets = nil
s.cache.insertRegionToCache(fakeRegion, true)
s.cache.insertRegionToCache(fakeRegion, true, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// stale buckets
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
s.cache.insertRegionToCache(fakeRegion, true)
s.cache.insertRegionToCache(fakeRegion, true, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// new buckets
Expand All @@ -1532,7 +1532,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
Keys: buckets.Keys,
}
fakeRegion.getStore().buckets = newBuckets
s.cache.insertRegionToCache(fakeRegion, true)
s.cache.insertRegionToCache(fakeRegion, true, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(newBuckets, cachedRegion.getStore().buckets)

Expand Down Expand Up @@ -1665,7 +1665,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region, true)
s.cache.insertRegionToCache(region, true, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand All @@ -1676,7 +1676,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region, true)
s.cache.insertRegionToCache(region, true, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand Down Expand Up @@ -1799,7 +1799,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)

r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
s.Equal(len(r), 2)
Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region, true)
cache.insertRegionToCache(region, true, true)
cache.mu.Unlock()

// Test accessFollower state with kv.ReplicaReadLearner request type.
Expand Down Expand Up @@ -383,7 +383,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region, true)
cache.insertRegionToCache(region, true, true)
cache.mu.Unlock()

// Verify creating the replicaSelector.
Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
Expand All @@ -626,7 +626,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
v3 := region.Region.confVer + 1
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}}
st = &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
Expand Down

0 comments on commit 8deef63

Please sign in to comment.