diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1dc50b8bce..f6bd35b367 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -48,4 +48,3 @@ jobs: uses: golangci/golangci-lint-action@v3 with: version: v1.55.2 - diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 4c9ed782c5..3b802f4525 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -41,7 +41,9 @@ import ( "fmt" "math" "math/rand" + "slices" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -120,8 +122,10 @@ func SetRegionCacheTTLSec(t int64) { } const ( - updated int32 = iota // region is updated and no need to reload. - needSync // need sync new region info. + needReloadOnAccess int32 = 1 << iota // indicates the region will be reloaded on next access + needExpireAfterTTL // indicates the region will expire after RegionCacheTTL (even when it's accessed continuously) + needDelayedReloadPending // indicates the region will be reloaded later after it's scanned by GC + needDelayedReloadReady // indicates the region has been scanned by GC and can be reloaded by id on next access ) // InvalidReason is the reason why a cached region is invalidated. @@ -148,14 +152,11 @@ const ( // Region presents kv region type Region struct { - meta *metapb.Region // raw region meta from PD, immutable after init - store unsafe.Pointer // point to region store info, see RegionStore - syncFlag int32 // region need be sync in next turn - lastAccess int64 // last region access time, see checkRegionCacheTTL - invalidReason InvalidReason // the reason why the region is invalidated - asyncReload atomic.Bool // the region need to be reloaded in async mode - lastLoad int64 // last region load time - hasUnavailableTiFlashStore bool // has unavailable TiFlash store, if yes, need to trigger async reload periodically + meta *metapb.Region // raw region meta from PD, immutable after init + store unsafe.Pointer // point to region store info, see RegionStore + lastAccess int64 // last region access time, see checkRegionCacheTTL + syncFlags int32 // region need be sync later, see needReloadOnAccess, needExpireAfterTTL + invalidReason InvalidReason // the reason why the region is invalidated } // AccessIndex represent the index for accessIndex array @@ -180,9 +181,10 @@ type regionStore struct { // buckets is not accurate and it can change even if the region is not changed. // It can be stale and buckets keys can be out of the region range. buckets *metapb.Buckets - // record all storeIDs on which pending peers reside. - // key is storeID, val is peerID. - pendingTiFlashPeerStores map[uint64]uint64 + // pendingPeers refers to pdRegion.PendingPeers. It's immutable and can be used to reconstruct pdRegions. + pendingPeers []*metapb.Peer + // downPeers refers to pdRegion.DownPeers. It's immutable and can be used to reconstruct pdRegions. + downPeers []*metapb.Peer } func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) { @@ -275,12 +277,13 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio // regionStore pull used store from global store map // to avoid acquire storeMu in later access. rs := ®ionStore{ - workTiKVIdx: 0, - proxyTiKVIdx: -1, - stores: make([]*Store, 0, len(r.meta.Peers)), - pendingTiFlashPeerStores: map[uint64]uint64{}, - storeEpochs: make([]uint32, 0, len(r.meta.Peers)), - buckets: pdRegion.Buckets, + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + buckets: pdRegion.Buckets, + pendingPeers: pdRegion.PendingPeers, + downPeers: pdRegion.DownPeers, } leader := pdRegion.Leader @@ -295,8 +298,8 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio if err != nil { return nil, err } - // Filter the peer on a tombstone store. - if addr == "" { + // Filter out the peer on a tombstone or down store. + if addr == "" || slices.ContainsFunc(pdRegion.DownPeers, func(dp *metapb.Peer) bool { return isSamePeer(dp, p) }) { continue } @@ -319,11 +322,6 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio } rs.stores = append(rs.stores, store) rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch)) - for _, pendingPeer := range pdRegion.PendingPeers { - if pendingPeer.Id == p.Id { - rs.pendingTiFlashPeerStores[store.storeID] = p.Id - } - } } // TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover. // Maybe we need backoff here. @@ -331,34 +329,16 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio return nil, errors.Errorf("no available peers, region: {%v}", r.meta) } - for _, p := range pdRegion.DownPeers { - store, exists := c.getStore(p.StoreId) - if !exists { - store = c.getStoreOrInsertDefault(p.StoreId) - } - addr, err := store.initResolve(bo, c) - if err != nil { - continue - } - // Filter the peer on a tombstone store. - if addr == "" { - continue - } - - if store.storeType == tikvrpc.TiFlash { - r.hasUnavailableTiFlashStore = true - break - } - } - rs.workTiKVIdx = leaderAccessIdx - r.meta.Peers = availablePeers - r.setStore(rs) + r.meta.Peers = availablePeers + // if the region has down peers, let it expire after TTL. + if len(pdRegion.DownPeers) > 0 { + r.syncFlags |= needExpireAfterTTL + } // mark region has been init accessed. r.lastAccess = time.Now().Unix() - r.lastLoad = r.lastAccess return r, nil } @@ -391,7 +371,7 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool { if ts-lastAccess > regionCacheTTLSec { return false } - if atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) { + if r.checkSyncFlags(needExpireAfterTTL) || atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) { return true } } @@ -410,31 +390,43 @@ func (r *Region) invalidateWithoutMetrics(reason InvalidReason) { atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime) } -// scheduleReload schedules reload region request in next LocateKey. -func (r *Region) scheduleReload() { - oldValue := atomic.LoadInt32(&r.syncFlag) - if oldValue != updated { - return - } - atomic.CompareAndSwapInt32(&r.syncFlag, oldValue, needSync) +func (r *Region) getSyncFlags() int32 { + return atomic.LoadInt32(&r.syncFlags) } -// checkNeedReloadAndMarkUpdated returns whether the region need reload and marks the region to be updated. -func (r *Region) checkNeedReloadAndMarkUpdated() bool { - oldValue := atomic.LoadInt32(&r.syncFlag) - if oldValue == updated { - return false +// checkSyncFlags returns true if sync_flags contains any of flags. +func (r *Region) checkSyncFlags(flags int32) bool { + return atomic.LoadInt32(&r.syncFlags)&flags > 0 +} + +// setSyncFlags sets the sync_flags bits to sync_flags|flags. +func (r *Region) setSyncFlags(flags int32) { + for { + oldFlags := atomic.LoadInt32(&r.syncFlags) + if oldFlags&flags == flags { + return + } + if atomic.CompareAndSwapInt32(&r.syncFlags, oldFlags, oldFlags|flags) { + return + } } - return atomic.CompareAndSwapInt32(&r.syncFlag, oldValue, updated) } -func (r *Region) checkNeedReload() bool { - v := atomic.LoadInt32(&r.syncFlag) - return v != updated +// resetSyncFlags reverts flags from sync_flags (that is sync_flags&^flags), returns the flags that are reset (0 means no flags are reverted). +func (r *Region) resetSyncFlags(flags int32) int32 { + for { + oldFlags := atomic.LoadInt32(&r.syncFlags) + if oldFlags&flags == 0 { + return 0 + } + if atomic.CompareAndSwapInt32(&r.syncFlags, oldFlags, oldFlags&^flags) { + return oldFlags & flags + } + } } func (r *Region) isValid() bool { - return r != nil && !r.checkNeedReload() && r.checkRegionCacheTTL(time.Now().Unix()) + return r != nil && !r.checkSyncFlags(needReloadOnAccess) && r.checkRegionCacheTTL(time.Now().Unix()) } type regionIndexMu struct { @@ -490,17 +482,13 @@ type RegionCache struct { // Context for background jobs ctx context.Context cancelFunc context.CancelFunc + wg sync.WaitGroup testingKnobs struct { // Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set, // requestLiveness always returns unreachable. mockRequestLiveness atomic.Pointer[livenessFunc] } - - regionsNeedReload struct { - sync.Mutex - regions []uint64 - } } // NewRegionCache creates a RegionCache. @@ -531,16 +519,21 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.mu = *newRegionIndexMu(nil) } + // TODO(zyguan): refine management of background cron jobs + c.wg.Add(1) go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) c.enableForwarding = config.GetGlobalConfig().EnableForwarding // Default use 15s as the update inerval. + c.wg.Add(1) go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) if config.GetGlobalConfig().RegionsRefreshInterval > 0 { c.timelyRefreshCache(config.GetGlobalConfig().RegionsRefreshInterval) } else { // cacheGC is not compatible with timelyRefreshCache + c.wg.Add(1) go c.cacheGC() } + c.wg.Add(1) go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second) return c } @@ -559,7 +552,7 @@ func newTestRegionCache() *RegionCache { // clear clears all cached data in the RegionCache. It's only used in tests. func (c *RegionCache) clear() { - c.mu = *newRegionIndexMu(nil) + c.mu.refresh(nil) c.clearStores() } @@ -571,20 +564,17 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldReg // Close releases region cache's resource. func (c *RegionCache) Close() { c.cancelFunc() + c.wg.Wait() } -var reloadRegionInterval = int64(10 * time.Second) - // asyncCheckAndResolveLoop with func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { ticker := time.NewTicker(interval) - reloadRegionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&reloadRegionInterval))) defer func() { + c.wg.Done() ticker.Stop() - reloadRegionTicker.Stop() }() var needCheckStores []*Store - reloadNextLoop := make(map[uint64]struct{}) for { needCheckStores = needCheckStores[:0] select { @@ -602,21 +592,6 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { // there's a deleted store in the stores map which guaranteed by reReslve(). return state != unresolved && state != tombstone && state != deleted }) - case <-reloadRegionTicker.C: - for regionID := range reloadNextLoop { - c.reloadRegion(regionID) - delete(reloadNextLoop, regionID) - } - c.regionsNeedReload.Lock() - for _, regionID := range c.regionsNeedReload.regions { - // will reload in next tick, wait a while for two reasons: - // 1. there may an unavailable duration while recreating the connection. - // 2. the store may just be started, and wait safe ts synced to avoid the - // possible dataIsNotReady error. - reloadNextLoop[regionID] = struct{}{} - } - c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0] - c.regionsNeedReload.Unlock() } } } @@ -874,7 +849,7 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto allStores = append(allStores, store.storeID) } for _, storeID := range allStores { - if _, ok := regionStore.pendingTiFlashPeerStores[storeID]; !ok { + if !slices.ContainsFunc(regionStore.pendingPeers, func(p *metapb.Peer) bool { return p.StoreId == storeID }) { nonPendingStores = append(nonPendingStores, storeID) } } @@ -891,11 +866,6 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, return nil, nil } - if cachedRegion.hasUnavailableTiFlashStore && time.Now().Unix()-cachedRegion.lastLoad > regionCacheTTLSec { - /// schedule an async reload to avoid load balance issue, refer https://github.com/pingcap/tidb/issues/35418 for details - c.scheduleReloadRegion(cachedRegion) - } - regionStore := cachedRegion.getStore() // sIdx is for load balance of TiFlash store. @@ -1123,9 +1093,18 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey c.insertRegionToCache(r, true, true) c.mu.Unlock() } - } else if r.checkNeedReloadAndMarkUpdated() { + } else if flags := r.resetSyncFlags(needReloadOnAccess | needDelayedReloadReady); flags > 0 { // load region when it be marked as need reload. - lr, err := c.loadRegion(bo, key, isEndKey) + reloadOnAccess := flags&needReloadOnAccess > 0 + var ( + lr *Region + err error + ) + if reloadOnAccess { + lr, err = c.loadRegion(bo, key, isEndKey) + } else { + lr, err = c.loadRegionByID(bo, r.GetID()) + } if err != nil { // ignore error and use old region info. logutil.Logger(bo.GetCtx()).Error("load region failure", @@ -1135,7 +1114,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r, true, true) + c.insertRegionToCache(r, reloadOnAccess, reloadOnAccess) c.mu.Unlock() } } @@ -1144,7 +1123,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey func (c *RegionCache) tryFindRegionByKey(key []byte, isEndKey bool) (r *Region) { r = c.searchCachedRegion(key, isEndKey) - if r == nil || r.checkNeedReloadAndMarkUpdated() { + if r == nil || r.checkSyncFlags(needReloadOnAccess) { return nil } return r @@ -1194,7 +1173,7 @@ func (c *RegionCache) OnSendFailForTiFlash(bo *retry.Backoffer, store *Store, re // force reload region when retry all known peers in region. if scheduleReload { - r.scheduleReload() + r.setSyncFlags(needReloadOnAccess) } } @@ -1256,7 +1235,7 @@ func (c *RegionCache) OnSendFail(bo *retry.Backoffer, ctx *RPCContext, scheduleR // force reload region when retry all known peers in region. if scheduleReload { - r.scheduleReload() + r.setSyncFlags(needReloadOnAccess) } } @@ -1267,7 +1246,8 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K r := c.getRegionByIDFromCache(regionID) c.mu.RUnlock() if r != nil { - if r.checkNeedReloadAndMarkUpdated() { + if flags := r.resetSyncFlags(needReloadOnAccess); flags > 0 { + reloadOnAccess := flags&needReloadOnAccess > 0 lr, err := c.loadRegionByID(bo, regionID) if err != nil { // ignore error and use old region info. @@ -1276,7 +1256,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } else { r = lr c.mu.Lock() - c.insertRegionToCache(r, true, true) + c.insertRegionToCache(r, reloadOnAccess, reloadOnAccess) c.mu.Unlock() } } @@ -1305,38 +1285,6 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K }, nil } -func (c *RegionCache) scheduleReloadRegion(region *Region) { - if region == nil || !region.asyncReload.CompareAndSwap(false, true) { - // async reload scheduled by other thread. - return - } - regionID := region.GetID() - if regionID > 0 { - c.regionsNeedReload.Lock() - c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID) - c.regionsNeedReload.Unlock() - } -} - -func (c *RegionCache) reloadRegion(regionID uint64) { - bo := retry.NewNoopBackoff(context.Background()) - lr, err := c.loadRegionByID(bo, regionID) - if err != nil { - // ignore error and use old region info. - logutil.Logger(bo.GetCtx()).Error("load region failure", - zap.Uint64("regionID", regionID), zap.Error(err)) - c.mu.RLock() - if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { - oldRegion.asyncReload.Store(false) - } - c.mu.RUnlock() - return - } - c.mu.Lock() - c.insertRegionToCache(lr, false, false) - c.mu.Unlock() -} - // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. @@ -1602,7 +1550,7 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { if ts-lastAccess > regionCacheTTLSec { return nil } - if latestRegion != nil { + if !latestRegion.checkSyncFlags(needExpireAfterTTL) { atomic.CompareAndSwapInt64(&latestRegion.lastAccess, atomic.LoadInt64(&latestRegion.lastAccess), ts) } return latestRegion @@ -1622,26 +1570,6 @@ func (c *RegionCache) GetAllStores() []*Store { }) } -func filterUnavailablePeers(region *pd.Region) { - if len(region.DownPeers) == 0 { - return - } - new := region.Meta.Peers[:0] - for _, p := range region.Meta.Peers { - available := true - for _, downPeer := range region.DownPeers { - if p.Id == downPeer.Id && p.StoreId == downPeer.StoreId { - available = false - break - } - } - if available { - new = append(new, p) - } - } - region.Meta.Peers = new -} - // loadRegion loads region from pd client, and picks the first peer as leader. // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful // when processing in reverse order. @@ -1689,7 +1617,6 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, backoffErr = errors.Errorf("region not found for key %q, encode_key: %q", util.HexRegionKeyStr(key), util.HexRegionKey(c.codec.EncodeRegionKey(key))) continue } - filterUnavailablePeers(reg) if len(reg.Meta.Peers) == 0 { return nil, errors.New("receive Region with no available peer") } @@ -1735,7 +1662,6 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg if reg == nil || reg.Meta == nil { return nil, errors.Errorf("region not found for regionID %d", regionID) } - filterUnavailablePeers(reg) if len(reg.Meta.Peers) == 0 { return nil, errors.New("receive Region with no available peer") } @@ -1765,8 +1691,12 @@ func (c *RegionCache) timelyRefreshCache(intervalS uint64) { return } ticker := time.NewTicker(time.Duration(intervalS) * time.Second) + c.wg.Add(1) go func() { - defer ticker.Stop() + defer func() { + c.wg.Done() + ticker.Stop() + }() for { select { case <-c.ctx.Done(): @@ -2156,11 +2086,15 @@ const cleanRegionNumPerRound = 50 // negligible. func (c *RegionCache) cacheGC() { ticker := time.NewTicker(cleanCacheInterval) - defer ticker.Stop() + defer func() { + c.wg.Done() + ticker.Stop() + }() beginning := newBtreeSearchItem([]byte("")) iterItem := beginning expired := make([]*btreeItem, cleanRegionNumPerRound) + remaining := make([]*Region, cleanRegionNumPerRound) for { select { case <-c.ctx.Done(): @@ -2168,6 +2102,7 @@ func (c *RegionCache) cacheGC() { case <-ticker.C: count := 0 expired = expired[:0] + remaining = remaining[:0] // Only RLock when checking TTL to avoid blocking other readers c.mu.RLock() @@ -2180,6 +2115,8 @@ func (c *RegionCache) cacheGC() { count++ if item.cachedRegion.isCacheTTLExpired(ts) { expired = append(expired, item) + } else { + remaining = append(remaining, item.cachedRegion) } return true }) @@ -2190,6 +2127,7 @@ func (c *RegionCache) cacheGC() { iterItem = beginning } + // Clean expired regions if len(expired) > 0 { c.mu.Lock() for _, item := range expired { @@ -2198,6 +2136,30 @@ func (c *RegionCache) cacheGC() { } c.mu.Unlock() } + + // Check remaining regions and update sync flags + for _, region := range remaining { + syncFlags := region.getSyncFlags() + if syncFlags&needDelayedReloadReady > 0 { + // the region will be reload soon on access + continue + } + if syncFlags&needDelayedReloadPending > 0 { + region.setSyncFlags(needDelayedReloadReady) + // the region will be reload soon on access, no need to check if it needs to be expired + continue + } + if syncFlags&needExpireAfterTTL == 0 { + regionStore := region.getStore() + for i, store := range regionStore.stores { + // if the region has a stale or unreachable store, let it expire after TTL. + if atomic.LoadUint32(&store.epoch) != regionStore.storeEpochs[i] || store.getLivenessState() != reachable { + region.setSyncFlags(needExpireAfterTTL) + break + } + } + } + } } } } @@ -3004,7 +2966,10 @@ func (s *Store) markAlreadySlow() { // asyncUpdateStoreSlowScore updates the slow score of each store periodically. func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) { ticker := time.NewTicker(interval) - defer ticker.Stop() + defer func() { + c.wg.Done() + ticker.Stop() + }() for { select { case <-c.ctx.Done(): @@ -3026,13 +2991,13 @@ func (c *RegionCache) checkAndUpdateStoreSlowScores() { zap.Stack("stack trace")) } }() - slowScoreMetrics := make(map[string]float64) + slowScoreMetrics := make(map[uint64]float64) c.forEachStore(func(store *Store) { store.updateSlowScoreStat() - slowScoreMetrics[store.addr] = float64(store.getSlowScore()) + slowScoreMetrics[store.storeID] = float64(store.getSlowScore()) }) for store, score := range slowScoreMetrics { - metrics.TiKVStoreSlowScoreGauge.WithLabelValues(store).Set(score) + metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(score) } } @@ -3054,7 +3019,10 @@ func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { // asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType. func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) { ticker := time.NewTicker(interval) - defer ticker.Stop() + defer func() { + c.wg.Done() + ticker.Stop() + }() for { select { case <-c.ctx.Done(): diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 39f4a1d7ed..2f09239d3c 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -58,6 +58,18 @@ import ( uatomic "go.uber.org/atomic" ) +type inspectedPDClient struct { + pd.Client + getRegion func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) +} + +func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { + if c.getRegion != nil { + return c.getRegion(ctx, c.Client, key, opts...) + } + return c.Client.GetRegion(ctx, key, opts...) +} + func TestRegionCache(t *testing.T) { suite.Run(t, new(testRegionCacheSuite)) } @@ -73,6 +85,7 @@ type testRegionCacheSuite struct { region1 uint64 cache *RegionCache bo *retry.Backoffer + onClosed func() } func (s *testRegionCacheSuite) SetupTest() { @@ -92,6 +105,9 @@ func (s *testRegionCacheSuite) SetupTest() { func (s *testRegionCacheSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() + if s.onClosed != nil { + s.onClosed() + } } func (s *testRegionCacheSuite) storeAddr(id uint64) string { @@ -293,7 +309,66 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...) } -func (s *testRegionCacheSuite) TestTiFlashDownPeersAndAsyncReload() { +func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() { + s.onClosed = func() { SetRegionCacheTTLSec(600) } + SetRegionCacheTTLSec(2) + + cntGetRegion := 0 + s.cache.pdClient = &inspectedPDClient{ + Client: s.cache.pdClient, + getRegion: func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) { + cntGetRegion++ + return cli.GetRegion(ctx, key, opts...) + }, + } + + s.Run("WithDownPeers", func() { + cntGetRegion = 0 + s.cache.clear() + s.cluster.MarkPeerDown(s.peer2) + + for i := 0; i < 50; i++ { + time.Sleep(100 * time.Millisecond) + _, err := s.cache.LocateKey(s.bo, []byte("a")) + s.NoError(err) + } + s.Equal(2, cntGetRegion, "should reload region with down peers every RegionCacheTTL") + }) + + s.Run("WithStaleStores", func() { + cntGetRegion = 0 + s.cache.clear() + store2 := s.cache.getStoreOrInsertDefault(s.store2) + + for i := 0; i < 50; i++ { + atomic.StoreUint32(&store2.epoch, uint32(i)) + time.Sleep(100 * time.Millisecond) + _, err := s.cache.LocateKey(s.bo, []byte("a")) + s.NoError(err) + } + s.Equal(2, cntGetRegion, "should reload region with stale stores every RegionCacheTTL") + }) + + s.Run("WithUnreachableStores", func() { + cntGetRegion = 0 + s.cache.clear() + store2 := s.cache.getStoreOrInsertDefault(s.store2) + atomic.StoreUint32(&store2.livenessState, uint32(unreachable)) + defer atomic.StoreUint32(&store2.livenessState, uint32(reachable)) + + for i := 0; i < 50; i++ { + time.Sleep(100 * time.Millisecond) + _, err := s.cache.LocateKey(s.bo, []byte("a")) + s.NoError(err) + } + s.Equal(2, cntGetRegion, "should reload region with unreachable stores every RegionCacheTTL") + }) +} + +func (s *testRegionCacheSuite) TestTiFlashRecoveredFromDown() { + s.onClosed = func() { SetRegionCacheTTLSec(600) } + SetRegionCacheTTLSec(3) + store3 := s.cluster.AllocID() peer3 := s.cluster.AllocID() s.cluster.AddStore(store3, s.storeAddr(store3)) @@ -313,34 +388,45 @@ func (s *testRegionCacheSuite) TestTiFlashDownPeersAndAsyncReload() { s.Nil(err) s.NotNil(ctx) region := s.cache.GetCachedRegionWithRLock(loc.Region) - s.Equal(region.hasUnavailableTiFlashStore, false) - s.Equal(region.asyncReload.Load(), false) + s.Equal(region.checkSyncFlags(needExpireAfterTTL), false) s.cache.clear() s.cluster.MarkPeerDown(peer3) - s.cache.reloadRegion(loc.Region.id) loc, err = s.cache.LocateKey(s.bo, []byte("a")) s.Nil(err) s.Equal(loc.Region.id, s.region1) region = s.cache.GetCachedRegionWithRLock(loc.Region) - s.Equal(region.hasUnavailableTiFlashStore, true) - s.Equal(region.asyncReload.Load(), false) + s.Equal(region.checkSyncFlags(needExpireAfterTTL), true) - SetRegionCacheTTLSec(3) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i <= 3; i++ { - s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode) - time.Sleep(1 * time.Second) + for i := 0; i <= 3; i++ { + time.Sleep(1 * time.Second) + loc, err = s.cache.LocateKey(s.bo, []byte("a")) + s.Nil(err) + rpcCtx, err := s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode) + s.Nil(err) + if rpcCtx != nil { + s.NotEqual(s.storeAddr(store3), rpcCtx.Addr, "should not access peer3 when it is down") } - }() - wg.Wait() - s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode) - s.Equal(region.hasUnavailableTiFlashStore, true) - s.Equal(region.asyncReload.Load(), true) + } + newRegion := s.cache.GetCachedRegionWithRLock(loc.Region) + s.NotNil(newRegion) + s.NotEqual(region, newRegion) + s.cluster.RemoveDownPeer(peer3) + for i := 0; ; i++ { + if i > 10 { + s.Fail("should access peer3 after it is up") + break + } + loc, err = s.cache.LocateKey(s.bo, []byte("a")) + s.Nil(err) + rpcCtx, err := s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode) + s.Nil(err) + if rpcCtx != nil && rpcCtx.Addr == s.storeAddr(store3) { + break + } + time.Sleep(1 * time.Second) + } } // TestFilterDownPeersOrPeersOnTombstoneOrDroppedStore verifies the RegionCache filter @@ -1306,7 +1392,6 @@ func (s *testRegionCacheSuite) TestPeersLenChange() { Meta: cpMeta, DownPeers: []*metapb.Peer{{Id: s.peer1, StoreId: s.store1}}, } - filterUnavailablePeers(cpRegion) region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) s.cache.insertRegionToCache(region, true, true) @@ -1511,7 +1596,7 @@ func (s *testRegionCacheSuite) TestBuckets() { // 2. insertRegionToCache keeps old buckets information if needed. fakeRegion := &Region{ meta: cachedRegion.meta, - syncFlag: cachedRegion.syncFlag, + syncFlags: cachedRegion.syncFlags, lastAccess: cachedRegion.lastAccess, invalidReason: cachedRegion.invalidReason, } @@ -1920,7 +2005,7 @@ func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() { s.NoError(err) fakeRegion := &Region{ meta: r.meta, - syncFlag: r.syncFlag, + syncFlags: r.syncFlags, lastAccess: r.lastAccess, invalidReason: r.invalidReason, } @@ -2053,11 +2138,10 @@ func BenchmarkInsertRegionToCache(b *testing.B) { }, } rs := ®ionStore{ - workTiKVIdx: 0, - proxyTiKVIdx: -1, - stores: make([]*Store, 0, len(r.meta.Peers)), - pendingTiFlashPeerStores: map[uint64]uint64{}, - storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), } r.setStore(rs) b.StartTimer() @@ -2091,11 +2175,10 @@ func BenchmarkInsertRegionToCache2(b *testing.B) { }, } rs := ®ionStore{ - workTiKVIdx: 0, - proxyTiKVIdx: -1, - stores: make([]*Store, 0, len(r.meta.Peers)), - pendingTiFlashPeerStores: map[uint64]uint64{}, - storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), } r.setStore(rs) b.StartTimer() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index ebb5c7dfb5..d839d53422 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -607,7 +607,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) ( if candidateNum == 0 { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateReplicaStore(leader, errors.Errorf("all followers are tried as proxy but fail")) - selector.region.scheduleReload() + selector.region.setSyncFlags(needReloadOnAccess) return nil, nil } @@ -727,7 +727,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } } if reloadRegion { - selector.regionCache.scheduleReloadRegion(selector.region) + selector.region.setSyncFlags(needDelayedReloadPending) } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { @@ -760,7 +760,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector labels: state.option.labels, } if leaderEpochStale { - selector.regionCache.scheduleReloadRegion(selector.region) + selector.region.setSyncFlags(needDelayedReloadPending) } return nil, stateChanged{} } @@ -918,7 +918,7 @@ func newReplicaSelector( cachedRegion := regionCache.GetCachedRegionWithRLock(regionID) if cachedRegion == nil { return nil, errors.New("cached region not found") - } else if cachedRegion.checkNeedReload() { + } else if cachedRegion.checkSyncFlags(needReloadOnAccess) { return nil, errors.New("cached region need reload") } else if !cachedRegion.checkRegionCacheTTL(time.Now().Unix()) { return nil, errors.New("cached region ttl expired") diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index db83000221..b2562b4858 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -77,6 +77,7 @@ type testRegionRequestToThreeStoresSuite struct { bo *retry.Backoffer regionRequestSender *RegionRequestSender mvccStore mocktikv.MVCCStore + onClosed func() } func (s *testRegionRequestToThreeStoresSuite) SetupTest() { @@ -93,6 +94,9 @@ func (s *testRegionRequestToThreeStoresSuite) SetupTest() { func (s *testRegionRequestToThreeStoresSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() + if s.onClosed != nil { + s.onClosed() + } } func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { @@ -1841,3 +1845,46 @@ func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() { s.Less(elapsed, time.Millisecond*2500) s.True(requestHandled) } + +func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { + s.onClosed = func() { SetRegionCacheTTLSec(600) } + SetRegionCacheTTLSec(2) + + bo := retry.NewBackoffer(context.Background(), -1) + key := []byte("key") + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadMixed, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + + downStore := s.cluster.GetStore(s.storeIDs[2]) + s.cluster.MarkPeerDown(s.peerIDs[2]) + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + s.Require().NotEqual(addr, downStore.Address) + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + }} + for i := 0; i < 15; i++ { + time.Sleep(200 * time.Millisecond) + loc, err := s.cache.LocateKey(bo, key) + s.Require().Nil(err) + resp, rpcCtx, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(downStore.Labels)) + s.Require().Nil(err) + s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value), "should access other peers") + } + + s.cluster.RemoveDownPeer(s.peerIDs[2]) + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + }} + for i := 0; i < 15; i++ { + time.Sleep(200 * time.Millisecond) + loc, err := s.cache.LocateKey(bo, key) + s.Require().Nil(err) + _, rpcCtx, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(downStore.Labels)) + s.Require().Nil(err) + if rpcCtx.Addr == downStore.Address { + return + } + } + s.Require().Fail("should access recovered peer after region reloading within RegionCacheTTL") +} diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index 3636355e23..f4a9881b71 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -250,13 +250,10 @@ func (s *testRegionCacheStaleReadSuite) setTimeout(id uint64) { //nolint: unused } func TestRegionCacheStaleRead(t *testing.T) { - originReloadRegionInterval := atomic.LoadInt64(&reloadRegionInterval) originBoTiKVServerBusy := retry.BoTiKVServerBusy defer func() { - atomic.StoreInt64(&reloadRegionInterval, originReloadRegionInterval) retry.BoTiKVServerBusy = originBoTiKVServerBusy }() - atomic.StoreInt64(&reloadRegionInterval, int64(24*time.Hour)) // disable reload region retry.BoTiKVServerBusy = retry.NewConfig("tikvServerBusy", &metrics.BackoffHistogramServerBusy, retry.NewBackoffFnCfg(2, 10, retry.EqualJitter), tikverr.ErrTiKVServerBusy) regionCacheTestCases := []RegionCacheTestCase{ { @@ -581,14 +578,7 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon return } - s.cache.regionsNeedReload.Lock() - if *asyncReload { - s.Len(s.cache.regionsNeedReload.regions, 1) - s.Equal(s.cache.regionsNeedReload.regions[0], s.regionID) - } else { - s.Empty(s.cache.regionsNeedReload.regions) - } - s.cache.regionsNeedReload.Unlock() + s.Equal(*asyncReload, region.checkSyncFlags(needDelayedReloadPending)) }() bo := retry.NewBackoffer(ctx, -1)