Skip to content

Commit

Permalink
Cherry pick some patches to 7.1 (#940)
Browse files Browse the repository at this point in the history
* reload region cache when store is resolved from invalid status (#843)

Signed-off-by: you06 <[email protected]>
Co-authored-by: disksing <[email protected]>

* fallback to follower when leader is busy (#916) (#923)

* fallback to follower when leader is busy

Signed-off-by: you06 <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>

* Resume max retry time check for stale read retry with leader option(#903) (#911)

* Resume max retry time check for stale read retry with leader option

Signed-off-by: cfzjywxk <[email protected]>

* add cancel

Signed-off-by: cfzjywxk <[email protected]>

---------

Signed-off-by: cfzjywxk <[email protected]>

* add region cache state test & fix some issues of replica selector (#910)

Signed-off-by: you06 <[email protected]>

remove duplicate code

Signed-off-by: you06 <[email protected]>

* enable workflow for tidb-7.1

Signed-off-by: you06 <[email protected]>

* update

Signed-off-by: you06 <[email protected]>

update

Signed-off-by: you06 <[email protected]>

fix test

Signed-off-by: you06 <[email protected]>

fix test

Signed-off-by: you06 <[email protected]>

* lint

Signed-off-by: you06 <[email protected]>

* lint

Signed-off-by: you06 <[email protected]>

* fix flaky test

Signed-off-by: you06 <[email protected]>

---------

Signed-off-by: you06 <[email protected]>
Signed-off-by: cfzjywxk <[email protected]>
Co-authored-by: disksing <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>
  • Loading branch information
4 people authored Sep 26, 2023
1 parent 0108750 commit 83165bd
Show file tree
Hide file tree
Showing 11 changed files with 1,269 additions and 70 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Integration Test

on:
push:
branches: [ master ]
branches: [ master, tidb-7.1 ]
pull_request:
branches: [ master ]
branches: [ master, tidb-7.1 ]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Unit Test

on:
push:
branches: [ master ]
branches: [ master, tidb-7.1 ]
pull_request:
branches: [ master ]
branches: [ master, tidb-7.1 ]

jobs:
test:
Expand Down
2 changes: 1 addition & 1 deletion error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ type ErrAssertionFailed struct {
*kvrpcpb.AssertionFailed
}

// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues is not.
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not.
type ErrLockOnlyIfExistsNoReturnValue struct {
StartTS uint64
ForUpdateTs uint64
Expand Down
95 changes: 80 additions & 15 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Region struct {
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload atomic.Bool // the region need to be reloaded in async mode
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -411,7 +412,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu {
r.latestVersions = make(map[uint64]RegionVerID)
r.sorted = NewSortedRegions(btreeDegree)
for _, region := range rs {
r.insertRegionToCache(region)
r.insertRegionToCache(region, true)
}
return r
}
Expand Down Expand Up @@ -457,6 +458,11 @@ type RegionCache struct {
// requestLiveness always returns unreachable.
mockRequestLiveness atomic.Pointer[livenessFunc]
}

regionsNeedReload struct {
sync.Mutex
regions []uint64
}
}

// NewRegionCache creates a RegionCache.
Expand Down Expand Up @@ -510,20 +516,27 @@ func (c *RegionCache) clear() {
}

// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
c.mu.insertRegionToCache(cachedRegion)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
}

// Close releases region cache's resource.
func (c *RegionCache) Close() {
c.cancelFunc()
}

var reloadRegionInterval = int64(10 * time.Second)

// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
reloadRegionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&reloadRegionInterval)))
defer func() {
ticker.Stop()
reloadRegionTicker.Stop()
}()
var needCheckStores []*Store
reloadNextLoop := make(map[uint64]struct{})
for {
needCheckStores = needCheckStores[:0]
select {
Expand All @@ -541,6 +554,21 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})
case <-reloadRegionTicker.C:
for regionID := range reloadNextLoop {
c.reloadRegion(regionID)
delete(reloadNextLoop, regionID)
}
c.regionsNeedReload.Lock()
for _, regionID := range c.regionsNeedReload.regions {
// will reload in next tick, wait a while for two reasons:
// 1. there may an unavailable duration while recreating the connection.
// 2. the store may just be started, and wait safe ts synced to avoid the
// possible dataIsNotReady error.
reloadNextLoop[regionID] = struct{}{}
}
c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0]
c.regionsNeedReload.Unlock()
}
}
}
Expand Down Expand Up @@ -1043,7 +1071,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
} else if r.checkNeedReloadAndMarkUpdated() {
// load region when it be marked as need reload.
Expand All @@ -1056,7 +1084,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand Down Expand Up @@ -1197,7 +1225,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
} else {
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand All @@ -1216,7 +1244,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}

c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
return &KeyLocation{
Region: r.VerID(),
Expand All @@ -1226,6 +1254,38 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}, nil
}

func (c *RegionCache) scheduleReloadRegion(region *Region) {
if region == nil || !region.asyncReload.CompareAndSwap(false, true) {
// async reload scheduled by other thread.
return
}
regionID := region.GetID()
if regionID > 0 {
c.regionsNeedReload.Lock()
c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID)
c.regionsNeedReload.Unlock()
}
}

func (c *RegionCache) reloadRegion(regionID uint64) {
bo := retry.NewNoopBackoff(context.Background())
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
c.mu.RLock()
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
oldRegion.asyncReload.Store(false)
}
c.mu.RUnlock()
return
}
c.mu.Lock()
c.insertRegionToCache(lr, false)
c.mu.Unlock()
}

// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
Expand Down Expand Up @@ -1310,7 +1370,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
for _, region := range regions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}

return
Expand Down Expand Up @@ -1384,7 +1444,9 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin

// insertRegionToCache tries to insert the Region to cache.
// It should be protected by c.mu.l.Lock().
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
store := cachedRegion.getStore()
Expand All @@ -1399,8 +1461,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand Down Expand Up @@ -1919,7 +1984,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext

c.mu.Lock()
for _, region := range newRegions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}
c.mu.Unlock()

Expand Down Expand Up @@ -2037,7 +2102,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
return
}
c.mu.Lock()
c.insertRegionToCache(new)
c.insertRegionToCache(new, true)
c.mu.Unlock()
}()
}
Expand Down Expand Up @@ -2525,8 +2590,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
}

func (s *Store) getResolveState() resolveState {
var state resolveState
if s == nil {
var state resolveState
return state
}
return resolveState(atomic.LoadUint64(&s.state))
Expand Down
18 changes: 9 additions & 9 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
region := createSampleRegion([]byte("k1"), []byte("k2"))
region.meta.Id = 1
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)

r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
Expand Down Expand Up @@ -1257,7 +1257,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
filterUnavailablePeers(cpRegion)
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() {
cpRegion := &pd.Region{Meta: cpMeta}
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1466,12 +1466,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
fakeRegion.setStore(cachedRegion.getStore().clone())
// no buckets
fakeRegion.getStore().buckets = nil
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// stale buckets
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// new buckets
Expand All @@ -1481,7 +1481,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
Keys: buckets.Keys,
}
fakeRegion.getStore().buckets = newBuckets
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(newBuckets, cachedRegion.getStore().buckets)

Expand Down Expand Up @@ -1614,7 +1614,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand All @@ -1625,7 +1625,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand Down Expand Up @@ -1739,7 +1739,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)

r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
s.Equal(len(r), 2)
Expand Down
Loading

0 comments on commit 83165bd

Please sign in to comment.