From 83165bd56d6fc6a939b8d67fde799998335d2409 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 26 Sep 2023 14:40:23 +0800 Subject: [PATCH] Cherry pick some patches to 7.1 (#940) * reload region cache when store is resolved from invalid status (#843) Signed-off-by: you06 Co-authored-by: disksing * fallback to follower when leader is busy (#916) (#923) * fallback to follower when leader is busy Signed-off-by: you06 Co-authored-by: cfzjywxk Co-authored-by: cfzjywxk * Resume max retry time check for stale read retry with leader option(#903) (#911) * Resume max retry time check for stale read retry with leader option Signed-off-by: cfzjywxk * add cancel Signed-off-by: cfzjywxk --------- Signed-off-by: cfzjywxk * add region cache state test & fix some issues of replica selector (#910) Signed-off-by: you06 remove duplicate code Signed-off-by: you06 * enable workflow for tidb-7.1 Signed-off-by: you06 * update Signed-off-by: you06 update Signed-off-by: you06 fix test Signed-off-by: you06 fix test Signed-off-by: you06 * lint Signed-off-by: you06 * lint Signed-off-by: you06 * fix flaky test Signed-off-by: you06 --------- Signed-off-by: you06 Signed-off-by: cfzjywxk Co-authored-by: disksing Co-authored-by: cfzjywxk Co-authored-by: cfzjywxk --- .github/workflows/integration.yml | 4 +- .github/workflows/test.yml | 4 +- error/error.go | 2 +- internal/locate/region_cache.go | 95 +- internal/locate/region_cache_test.go | 18 +- internal/locate/region_request.go | 181 +++- internal/locate/region_request3_test.go | 160 +++- internal/locate/region_request_state_test.go | 841 ++++++++++++++++++ internal/locate/region_request_test.go | 4 +- internal/mockstore/mocktikv/cluster.go | 4 +- .../mockstore/mocktikv/cluster_manipulate.go | 26 + 11 files changed, 1269 insertions(+), 70 deletions(-) create mode 100644 internal/locate/region_request_state_test.go diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 6a17098da..89014b278 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -2,9 +2,9 @@ name: Integration Test on: push: - branches: [ master ] + branches: [ master, tidb-7.1 ] pull_request: - branches: [ master ] + branches: [ master, tidb-7.1 ] jobs: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6ad905074..27a558dcc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,9 +2,9 @@ name: Unit Test on: push: - branches: [ master ] + branches: [ master, tidb-7.1 ] pull_request: - branches: [ master ] + branches: [ master, tidb-7.1 ] jobs: test: diff --git a/error/error.go b/error/error.go index 761ebef6d..62a75ff65 100644 --- a/error/error.go +++ b/error/error.go @@ -246,7 +246,7 @@ type ErrAssertionFailed struct { *kvrpcpb.AssertionFailed } -// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues“ is not. +// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not. type ErrLockOnlyIfExistsNoReturnValue struct { StartTS uint64 ForUpdateTs uint64 diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index ec96b71b1..e60efe64d 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -153,6 +153,7 @@ type Region struct { syncFlag int32 // region need be sync in next turn lastAccess int64 // last region access time, see checkRegionCacheTTL invalidReason InvalidReason // the reason why the region is invalidated + asyncReload atomic.Bool // the region need to be reloaded in async mode } // AccessIndex represent the index for accessIndex array @@ -411,7 +412,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu { r.latestVersions = make(map[uint64]RegionVerID) r.sorted = NewSortedRegions(btreeDegree) for _, region := range rs { - r.insertRegionToCache(region) + r.insertRegionToCache(region, true) } return r } @@ -457,6 +458,11 @@ type RegionCache struct { // requestLiveness always returns unreachable. mockRequestLiveness atomic.Pointer[livenessFunc] } + + regionsNeedReload struct { + sync.Mutex + regions []uint64 + } } // NewRegionCache creates a RegionCache. @@ -510,8 +516,8 @@ func (c *RegionCache) clear() { } // thread unsafe, should use with lock -func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { - c.mu.insertRegionToCache(cachedRegion) +func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { + c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion) } // Close releases region cache's resource. @@ -519,11 +525,18 @@ func (c *RegionCache) Close() { c.cancelFunc() } +var reloadRegionInterval = int64(10 * time.Second) + // asyncCheckAndResolveLoop with func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { ticker := time.NewTicker(interval) - defer ticker.Stop() + reloadRegionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&reloadRegionInterval))) + defer func() { + ticker.Stop() + reloadRegionTicker.Stop() + }() var needCheckStores []*Store + reloadNextLoop := make(map[uint64]struct{}) for { needCheckStores = needCheckStores[:0] select { @@ -541,6 +554,21 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { // there's a deleted store in the stores map which guaranteed by reReslve(). return state != unresolved && state != tombstone && state != deleted }) + case <-reloadRegionTicker.C: + for regionID := range reloadNextLoop { + c.reloadRegion(regionID) + delete(reloadNextLoop, regionID) + } + c.regionsNeedReload.Lock() + for _, regionID := range c.regionsNeedReload.regions { + // will reload in next tick, wait a while for two reasons: + // 1. there may an unavailable duration while recreating the connection. + // 2. the store may just be started, and wait safe ts synced to avoid the + // possible dataIsNotReady error. + reloadNextLoop[regionID] = struct{}{} + } + c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0] + c.regionsNeedReload.Unlock() } } } @@ -1043,7 +1071,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } else if r.checkNeedReloadAndMarkUpdated() { // load region when it be marked as need reload. @@ -1056,7 +1084,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } } @@ -1197,7 +1225,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } else { r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } } @@ -1216,7 +1244,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() return &KeyLocation{ Region: r.VerID(), @@ -1226,6 +1254,38 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K }, nil } +func (c *RegionCache) scheduleReloadRegion(region *Region) { + if region == nil || !region.asyncReload.CompareAndSwap(false, true) { + // async reload scheduled by other thread. + return + } + regionID := region.GetID() + if regionID > 0 { + c.regionsNeedReload.Lock() + c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID) + c.regionsNeedReload.Unlock() + } +} + +func (c *RegionCache) reloadRegion(regionID uint64) { + bo := retry.NewNoopBackoff(context.Background()) + lr, err := c.loadRegionByID(bo, regionID) + if err != nil { + // ignore error and use old region info. + logutil.Logger(bo.GetCtx()).Error("load region failure", + zap.Uint64("regionID", regionID), zap.Error(err)) + c.mu.RLock() + if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { + oldRegion.asyncReload.Store(false) + } + c.mu.RUnlock() + return + } + c.mu.Lock() + c.insertRegionToCache(lr, false) + c.mu.Unlock() +} + // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. @@ -1310,7 +1370,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey // TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information // for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it. for _, region := range regions { - c.insertRegionToCache(region) + c.insertRegionToCache(region, true) } return @@ -1384,7 +1444,9 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // insertRegionToCache tries to insert the Region to cache. // It should be protected by c.mu.l.Lock(). -func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { +// if `invalidateOldRegion` is false, the old region cache should be still valid, +// and it may still be used by some kv requests. +func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) if oldRegion != nil { store := cachedRegion.getStore() @@ -1399,8 +1461,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader { store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly)) } - // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. - oldRegion.invalidate(Other) + // If the old region is still valid, do not invalidate it to avoid unnecessary backoff. + if invalidateOldRegion { + // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. + oldRegion.invalidate(Other) + } // Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which // is under transferring regions. store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load()) @@ -1919,7 +1984,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext c.mu.Lock() for _, region := range newRegions { - c.insertRegionToCache(region) + c.insertRegionToCache(region, true) } c.mu.Unlock() @@ -2037,7 +2102,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV return } c.mu.Lock() - c.insertRegionToCache(new) + c.insertRegionToCache(new, true) c.mu.Unlock() }() } @@ -2525,8 +2590,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { } func (s *Store) getResolveState() resolveState { - var state resolveState if s == nil { + var state resolveState return state } return resolveState(atomic.LoadUint64(&s.state)) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 03ef3e309..619da2d2e 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -966,7 +966,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() { region := createSampleRegion([]byte("k1"), []byte("k2")) region.meta.Id = 1 region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10} - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}} r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}} @@ -1257,7 +1257,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() { filterUnavailablePeers(cpRegion) region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) // OnSendFail should not panic s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) @@ -1293,7 +1293,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() { cpRegion := &pd.Region{Meta: cpMeta} region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) // OnSendFail should not panic s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) @@ -1466,12 +1466,12 @@ func (s *testRegionCacheSuite) TestBuckets() { fakeRegion.setStore(cachedRegion.getStore().clone()) // no buckets fakeRegion.getStore().buckets = nil - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(defaultBuckets, cachedRegion.getStore().buckets) // stale buckets fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1} - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(defaultBuckets, cachedRegion.getStore().buckets) // new buckets @@ -1481,7 +1481,7 @@ func (s *testRegionCacheSuite) TestBuckets() { Keys: buckets.Keys, } fakeRegion.getStore().buckets = newBuckets - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(newBuckets, cachedRegion.getStore().buckets) @@ -1614,7 +1614,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() { region, err := s.cache.loadRegion(s.bo, []byte("c"), false) s.Nil(err) s.Equal(region.GetID(), regions[0]) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) loc, err = s.cache.LocateKey(s.bo, []byte{'c'}) s.Nil(err) s.Equal(loc.Region.GetID(), regions[0]) @@ -1625,7 +1625,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() { region, err = s.cache.loadRegion(s.bo, []byte("e"), false) s.Nil(err) s.Equal(region.GetID(), regions[0]) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) loc, err = s.cache.LocateKey(s.bo, []byte{'e'}) s.Nil(err) s.Equal(loc.Region.GetID(), regions[0]) @@ -1739,7 +1739,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10) s.Equal(len(r), 2) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 2c478a5d3..176850cda 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -345,7 +345,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec // a request. So, before the new leader is elected, we should not send requests // to the unreachable old leader to avoid unnecessary timeout. if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromOnNotLeader: true} return nil, stateChanged{} } if selector.busyThreshold > 0 { @@ -369,7 +369,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep return } if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromOnNotLeader: true} } if liveness != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) @@ -377,7 +377,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep } func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromOnNotLeader: true} } // tryFollower is the state where we cannot access the known leader @@ -391,36 +391,77 @@ type tryFollower struct { stateBase leaderIdx AccessIndex lastIdx AccessIndex + // fromOnNotLeader indicates whether the state is changed from onNotLeader. + fromOnNotLeader bool + labels []*metapb.StoreLabel } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - var targetReplica *replica - // Search replica that is not attempted from the last accessed replica - for i := 1; i < len(selector.replicas); i++ { - idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) - if idx == state.leaderIdx { - continue + //hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout + filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) { + for i := 0; i < len(selector.replicas); i++ { + idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + if idx == state.leaderIdx { + continue + } + selectReplica := selector.replicas[idx] + if selectReplica.store.getLivenessState() != unreachable && fn(selectReplica) { + return idx, selectReplica + } } - targetReplica = selector.replicas[idx] - // Each follower is only tried once - if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable { + return -1, nil + } + + if len(state.labels) > 0 { + idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { + return selectReplica.store.IsLabelsMatch(state.labels) + }) + if selectReplica != nil && idx >= 0 { + state.lastIdx = idx + selector.targetIdx = idx + } + // labels only take effect for first try. + state.labels = nil + } + + if selector.targetIdx < 0 { + // Search replica that is not attempted from the last accessed replica + idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { + return !selectReplica.isExhausted(1) + }) + if selectReplica != nil && idx >= 0 { state.lastIdx = idx selector.targetIdx = idx - break } } + // If all followers are tried and fail, backoff and retry. if selector.targetIdx < 0 { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil } - return selector.buildRPCContext(bo) + rpcCtx, err := selector.buildRPCContext(bo) + if err != nil || rpcCtx == nil { + return rpcCtx, err + } + if !state.fromOnNotLeader { + replicaRead := true + rpcCtx.contextPatcher.replicaRead = &replicaRead + } + staleRead := false + rpcCtx.contextPatcher.staleRead = &staleRead + return rpcCtx, nil } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { - panic("the store must exist") + if state.fromOnNotLeader { + peer := selector.targetReplica().peer + if !selector.region.switchWorkLeaderToPeer(peer) { + logutil.BgLogger().Warn("the store must exist", + zap.Uint64("store", peer.StoreId), + zap.Uint64("peer", peer.Id)) + } } } @@ -542,6 +583,10 @@ type accessFollower struct { learnerOnly bool } +// Follower read will try followers first, if no follower is available, it will fallback to leader. +// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader, +// if the leader read receive server-is-busy and connection errors, the region cache is still valid, +// and the state will be changed to tryFollower, which will read by replica read. func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { replicaSize := len(selector.replicas) resetStaleRead := false @@ -573,23 +618,45 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if state.option.preferLeader { state.lastIdx = state.leaderIdx } + var offset int + if state.lastIdx >= 0 { + offset = rand.Intn(replicaSize) + } + reloadRegion := false for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - idx := AccessIndex((int(state.lastIdx) + i) % replicaSize) - // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader - // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. - for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ { - idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize) + var idx AccessIndex + if state.option.preferLeader { + if i == 0 { + idx = state.lastIdx + } else { + // randomly select next replica, but skip state.lastIdx + if (i+offset)%replicaSize == 0 { + offset++ + } + } + } else { + idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) } - if state.isCandidate(idx, selector.replicas[idx]) { + selectReplica := selector.replicas[idx] + if state.isCandidate(idx, selectReplica) { state.lastIdx = idx selector.targetIdx = idx break } + if selectReplica.isEpochStale() && + selectReplica.store.getResolveState() == resolved && + selectReplica.store.getLivenessState() == reachable { + reloadRegion = true + } + } + if reloadRegion { + selector.regionCache.scheduleReloadRegion(selector.region) } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { leader := selector.replicas[state.leaderIdx] - leaderInvalid := leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) + leaderEpochStale := leader.isEpochStale() + leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader) if len(state.option.labels) > 0 { logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels", zap.Uint64("region", selector.region.GetID()), @@ -597,6 +664,20 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector zap.Any("labels", state.option.labels)) } if leaderInvalid { + // In stale-read, the request will fallback to leader after the local follower failure. + // If the leader is also unavailable, we can fallback to the follower and use replica-read flag again, + // The remote follower not tried yet, and the local follower can retry without stale-read flag. + if state.isStaleRead { + selector.state = &tryFollower{ + leaderIdx: state.leaderIdx, + lastIdx: state.leaderIdx, + labels: state.option.labels, + } + if leaderEpochStale { + selector.regionCache.scheduleReloadRegion(selector.region) + } + return nil, stateChanged{} + } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -623,6 +704,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector return rpcCtx, nil } +func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { + // Allow another extra retry for the following case: + // 1. The stale read is enabled and leader peer is selected as the target peer at first. + // 2. Data is not ready is returned from the leader peer. + // 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer. + // 4. The leader peer should be retried again using snapshot read. + if state.isStaleRead && state.option.leaderOnly { + return leader.isExhausted(2) + } else { + return leader.isExhausted(1) + } +} + func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { if selector.checkLiveness(bo, selector.targetReplica()) != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) @@ -634,23 +728,25 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable { return false } - // The request can only be sent to the leader. - if state.option.leaderOnly && idx == state.leaderIdx { - return true + if state.option.leaderOnly { + // The request can only be sent to the leader. + return idx == state.leaderIdx } - // Choose a replica with matched labels. - followerCandidate := !state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && - replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner) - if !followerCandidate { + if !state.tryLeader && idx == state.leaderIdx { + // The request cannot be sent to leader. return false } + if state.learnerOnly { + // The request can only be sent to the learner. + return replica.peer.Role == metapb.PeerRole_Learner + } // And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers // as candidates to serve the Read request. if state.option.preferLeader && replica.store.isSlow() { return false } - // If the stores are limited, check if the store is in the list. - return replica.store.IsStoreMatch(state.option.stores) + // Choose a replica with matched labels. + return replica.store.IsStoreMatch(state.option.stores) && replica.store.IsLabelsMatch(state.option.labels) } // tryIdleReplica is the state where we find the leader is busy and retry the request using replica read. @@ -1030,6 +1126,9 @@ func (s *replicaSelector) onServerIsBusy( // Mark the server is busy (the next incoming READs could be redirect // to expected followers. ) ctx.Store.markAlreadySlow() + if s.canFallback2Follower() { + return true, nil + } } err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) if err != nil { @@ -1038,6 +1137,23 @@ func (s *replicaSelector) onServerIsBusy( return true, nil } +// For some reasons, the leader is unreachable by now, try followers instead. +// the state is changed in accessFollower.next when leader is unavailable. +func (s *replicaSelector) canFallback2Follower() bool { + if s == nil || s.state == nil { + return false + } + state, ok := s.state.(*accessFollower) + if !ok { + return false + } + if !state.isStaleRead { + return false + } + // can fallback to follower only when the leader is exhausted. + return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx]) +} + func (s *replicaSelector) invalidateRegion() { if s.region != nil { s.region.invalidate(Other) @@ -1865,6 +1981,7 @@ func (s *RegionRequestSender) onRegionError( } // This peer is removed from the region. Invalidate the region since it's too stale. + // if the region error is from follower, can we mark the peer unavailable and reload region asynchronously? if regionErr.GetRegionNotFound() != nil { s.regionCache.InvalidateCachedRegion(ctx.Region) return false, nil diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 236e7b56f..4757a60a6 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -37,6 +37,7 @@ package locate import ( "context" "fmt" + "strconv" "sync/atomic" "testing" "time" @@ -328,7 +329,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) cache.mu.Unlock() // Test accessFollower state with kv.ReplicaReadLearner request type. @@ -344,7 +345,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) - // Should swith to the next follower. + // Should switch to the next follower. s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) } @@ -379,7 +380,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) cache.mu.Unlock() // Verify creating the replicaSelector. @@ -586,7 +587,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ { rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) - // Should swith to the next follower. + // Should switch to the next follower. s.NotEqual(lastIdx, state3.lastIdx) // Shouldn't access the leader if followers aren't exhausted. s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx) @@ -1096,7 +1097,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg req.EnableStaleRead() req.ReplicaReadType = kv.ReplicaReadFollower - ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() bo := retry.NewBackoffer(ctx, -1) s.Nil(err) resp, retry, err := s.regionRequestSender.SendReq(bo, req, regionLoc.Region, time.Second) @@ -1188,3 +1190,151 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() s.Equal(0, retryTimes) } } + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + var followerID *uint64 + for _, storeID := range s.storeIDs { + if storeID != leaderStore.storeID { + id := storeID + followerID = &id + break + } + } + s.NotNil(followerID) + followerLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(*followerID, 10), + }, + } + + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + + dataIsNotReady := false + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + if dataIsNotReady && req.StaleRead { + dataIsNotReady = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + if addr == leaderStore.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{}, + }}}, nil + } + if !req.ReplicaRead { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + }} + + for _, localLeader := range []bool{true, false} { + dataIsNotReady = true + // data is not ready, then server is busy in the first round, + // directly server is busy in the second round. + for i := 0; i < 2; i++ { + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + if localLeader { + ops = append(ops, WithMatchLabels(leaderLabel)) + } else { + ops = append(ops, WithMatchLabels(followerLabel)) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + if localLeader { + s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value)) + } else { + s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value)) + } + cancel() + } + } +} + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + value := []byte("value") + isFirstReq := true + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + // Return `DataIsNotReady` for the first time on leader. + if isFirstReq { + isFirstReq = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + }} + + region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + s.True(region.isValid()) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + ops = append(ops, WithMatchLabels(leaderLabel)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + s.Equal(getResp.Value, value) +} diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go new file mode 100644 index 000000000..d1b5c6201 --- /dev/null +++ b/internal/locate/region_request_state_test.go @@ -0,0 +1,841 @@ +// Copyright 2023 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package locate + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/apicodec" + "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" + "github.com/tikv/client-go/v2/internal/retry" + "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/tikvrpc" +) + +type testRegionCacheStaleReadSuite struct { + *require.Assertions + cluster *mocktikv.Cluster + storeIDs []uint64 + peerIDs []uint64 + regionID uint64 + leaderPeer uint64 + store2zone map[uint64]string + cache *RegionCache + bo *retry.Backoffer + regionRequestSender *RegionRequestSender + mvccStore mocktikv.MVCCStore + injection testRegionCacheFSMSuiteInjection +} + +type testRegionCacheFSMSuiteInjection struct { + leaderRegionError func(*tikvrpc.Request, string) *errorpb.Error + followerRegionError func(*tikvrpc.Request, string) *errorpb.Error + unavailableStoreIDs map[uint64]struct{} + timeoutStoreIDs map[uint64]struct{} +} + +type SuccessReadType int + +const ( + ReadFail SuccessReadType = iota + SuccessLeaderRead + SuccessFollowerRead + SuccessStaleRead +) + +func (s *testRegionCacheStaleReadSuite) SetupTest() { + s.mvccStore = mocktikv.MustNewMVCCStore() + s.cluster = mocktikv.NewCluster(s.mvccStore) + s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer, s.store2zone = mocktikv.BootstrapWithMultiZones(s.cluster, 3, 2) + pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} + s.cache = NewRegionCache(pdCli) + s.bo = retry.NewNoopBackoff(context.Background()) + client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) + s.regionRequestSender = NewRegionRequestSender(s.cache, client) + s.setClient() + s.injection = testRegionCacheFSMSuiteInjection{ + unavailableStoreIDs: make(map[uint64]struct{}), + } +} + +func (s *testRegionCacheStaleReadSuite) TearDownTest() { + s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(nil)) + s.cache.Close() + s.mvccStore.Close() +} + +func (s *testRegionCacheStaleReadSuite) getStore(leader bool) (uint64, *metapb.Store) { + var ( + zone string + peerID uint64 + storeID uint64 + ) + if leader { + zone = "z1" + } else { + zone = "z2" + } + region, _ := s.cluster.GetRegion(s.regionID) +FIND: + for _, peer := range region.Peers { + store := s.cluster.GetStore(peer.StoreId) + for _, label := range store.Labels { + if label.Key == "zone" && label.Value == zone { + peerID = peer.Id + storeID = peer.StoreId + break FIND + } + } + } + store := s.cluster.GetStore(storeID) + if store == nil { + return 0, nil + } + return peerID, store +} + +func (s *testRegionCacheStaleReadSuite) getLeader() (uint64, *metapb.Store) { + return s.getStore(true) +} + +func (s *testRegionCacheStaleReadSuite) getFollower() (uint64, *metapb.Store) { + return s.getStore(false) +} + +func (s *testRegionCacheStaleReadSuite) setClient() { + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + var store *metapb.Store + find := false + for _, one := range s.cluster.GetAllStores() { + if one.Address == addr { + store = one + find = true + break + } + } + if !find { + return nil, errors.New("no available connections") + } + if _, unavailable := s.injection.unavailableStoreIDs[store.Id]; unavailable { + return nil, errors.New("no available connections") + } + if _, timeout := s.injection.timeoutStoreIDs[store.Id]; timeout { + return nil, errors.WithMessage(context.DeadlineExceeded, "wait recvLoop") + } + + zone := "" + for _, label := range store.Labels { + if label.Key == "zone" { + zone = label.Value + break + } + } + response = &tikvrpc.Response{} + region, _ := s.cluster.GetRegion(s.regionID) + peerExist := false + for _, peer := range region.Peers { + if req.Peer.Id == peer.Id { + if peer.StoreId != store.Id { + response.Resp = &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + RegionNotFound: &errorpb.RegionNotFound{RegionId: s.regionID}, + }} + return + } + peerExist = true + } + } + if !peerExist { + response.Resp = &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + RegionNotFound: &errorpb.RegionNotFound{RegionId: s.regionID}, + }} + return + } + + _, leader := s.getLeader() + s.NotNil(leader) + isLeader := addr == leader.Address + if isLeader { + // leader region error + if s.injection.leaderRegionError != nil { + if regionRrr := s.injection.leaderRegionError(req, zone); regionRrr != nil { + response.Resp = &kvrpcpb.GetResponse{RegionError: regionRrr} + return + } + } + } else { + // follower read leader + if !req.ReplicaRead && !req.StaleRead { + _, leaderPeer, _ := s.cluster.GetRegionByID(s.regionID) + response.Resp = &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{ + RegionId: req.RegionId, + Leader: leaderPeer, + }, + }} + return + } + // follower region error + if s.injection.followerRegionError != nil { + if regionRrr := s.injection.followerRegionError(req, zone); regionRrr != nil { + response.Resp = &kvrpcpb.GetResponse{RegionError: regionRrr} + return + } + } + } + // no error + var successReadType SuccessReadType + if req.StaleRead { + successReadType = SuccessStaleRead + } else if isLeader { + successReadType = SuccessLeaderRead + } else { + successReadType = SuccessFollowerRead + } + s.NotEmpty(zone) + respStr := fmt.Sprintf("%d-%s-%d", store.Id, zone, successReadType) + response.Resp = &kvrpcpb.GetResponse{Value: []byte(respStr)} + return + }} + + tf := func(store *Store, bo *retry.Backoffer) livenessState { + _, ok := s.injection.unavailableStoreIDs[store.storeID] + if ok { + return unreachable + } + return reachable + } + s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) +} + +func (s *testRegionCacheStaleReadSuite) extractResp(resp *tikvrpc.Response) (uint64, string, SuccessReadType) { + resps := strings.Split(string(resp.Resp.(*kvrpcpb.GetResponse).Value), "-") + s.Len(resps, 3) + storeID, err := strconv.Atoi(resps[0]) + s.Nil(err) + successReadType, err := strconv.Atoi(resps[2]) + s.Nil(err) + return uint64(storeID), resps[1], SuccessReadType(successReadType) +} + +func (s *testRegionCacheStaleReadSuite) setUnavailableStore(id uint64) { + s.injection.unavailableStoreIDs[id] = struct{}{} +} + +func (s *testRegionCacheStaleReadSuite) setTimeout(id uint64) { //nolint: unused + s.injection.timeoutStoreIDs[id] = struct{}{} +} + +func TestRegionCacheStaleRead(t *testing.T) { + originReloadRegionInterval := atomic.LoadInt64(&reloadRegionInterval) + originBoTiKVServerBusy := retry.BoTiKVServerBusy + defer func() { + atomic.StoreInt64(&reloadRegionInterval, originReloadRegionInterval) + retry.BoTiKVServerBusy = originBoTiKVServerBusy + }() + atomic.StoreInt64(&reloadRegionInterval, int64(24*time.Hour)) // disable reload region + retry.BoTiKVServerBusy = retry.NewConfig("tikvServerBusy", &metrics.BackoffHistogramServerBusy, retry.NewBackoffFnCfg(2, 10, retry.EqualJitter), tikverr.ErrTiKVServerBusy) + regionCacheTestCases := []RegionCacheTestCase{ + { + do: followerDown, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessStaleRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z1"}, + followerSuccessReadType: SuccessLeaderRead, + }, + { + do: followerDownAndUp, + leaderRegionValid: true, + leaderAsyncReload: None[bool](), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessStaleRead, + followerRegionValid: true, + followerAsyncReload: Some(true), + followerSuccessReplica: []string{"z1"}, + // because follower's epoch is changed, leader will be selected. + followerSuccessReadType: SuccessStaleRead, + }, + { + do: followerMove, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessStaleRead, + followerRegionValid: false, + followerAsyncReload: Some(false), + // may async reload region and access it from leader. + followerSuccessReplica: []string{}, + followerSuccessReadType: ReadFail, + }, + { + do: evictLeader, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + // leader is evicted, but can still serve as follower. + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessStaleRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z2"}, + followerSuccessReadType: SuccessStaleRead, + }, + { + do: leaderMove, + leaderRegionValid: false, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{}, + leaderSuccessReadType: ReadFail, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z2"}, + followerSuccessReadType: SuccessStaleRead, + }, + { + do: leaderDown, + leaderRegionValid: true, + leaderAsyncReload: Some(true), + leaderSuccessReplica: []string{"z2", "z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z2"}, + followerSuccessReadType: SuccessStaleRead, + }, + { + do: leaderDownAndUp, + leaderRegionValid: true, + leaderAsyncReload: Some(true), + leaderSuccessReplica: []string{"z2", "z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: None[bool](), + followerSuccessReplica: []string{"z2"}, + followerSuccessReadType: SuccessStaleRead, + }, + { + do: leaderDownAndElect, + leaderRegionValid: true, + leaderAsyncReload: Some(true), + leaderSuccessReplica: []string{"z2", "z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: None[bool](), + followerSuccessReplica: []string{"z2"}, + followerSuccessReadType: SuccessStaleRead, + }, + { + do: leaderDataIsNotReady, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessLeaderRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z2"}, + followerSuccessReadType: SuccessStaleRead, + }, + { + do: followerDataIsNotReady, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessStaleRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z1"}, + followerSuccessReadType: SuccessLeaderRead, + }, + { + debug: true, + do: leaderServerIsBusy, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z2", "z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z2"}, + followerSuccessReadType: SuccessStaleRead, + }, + { + do: followerServerIsBusy, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessStaleRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z1"}, + followerSuccessReadType: SuccessLeaderRead, + }, + { + do: leaderDataIsNotReady, + extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessLeaderRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z1"}, + followerSuccessReadType: SuccessLeaderRead, + }, + { + do: leaderDataIsNotReady, + extra: []func(suite *testRegionCacheStaleReadSuite){followerDataIsNotReady}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessLeaderRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z1"}, + followerSuccessReadType: SuccessLeaderRead, + }, + { + do: leaderDataIsNotReady, + extra: []func(suite *testRegionCacheStaleReadSuite){followerDown}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z1"}, + leaderSuccessReadType: SuccessLeaderRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z1"}, + followerSuccessReadType: SuccessLeaderRead, + }, + { + do: leaderServerIsBusy, + extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z3"}, + followerSuccessReadType: SuccessFollowerRead, + }, + { + do: leaderServerIsBusy, + extra: []func(suite *testRegionCacheStaleReadSuite){followerDataIsNotReady}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z2", "z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z2", "z3"}, + followerSuccessReadType: SuccessFollowerRead, + }, + { + do: leaderServerIsBusy, + extra: []func(suite *testRegionCacheStaleReadSuite){followerDown}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(false), + leaderSuccessReplica: []string{"z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: Some(false), + followerSuccessReplica: []string{"z3"}, + followerSuccessReadType: SuccessFollowerRead, + }, + { + do: leaderDown, + extra: []func(suite *testRegionCacheStaleReadSuite){followerDataIsNotReady}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(true), + leaderSuccessReplica: []string{"z2", "z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: Some(true), + followerSuccessReplica: []string{"z2", "z3"}, + followerSuccessReadType: SuccessFollowerRead, + }, + { + do: leaderDown, + extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(true), + leaderSuccessReplica: []string{"z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: Some(true), + followerSuccessReplica: []string{"z3"}, + followerSuccessReadType: SuccessFollowerRead, + }, + { + do: leaderDown, + extra: []func(suite *testRegionCacheStaleReadSuite){followerDown}, + recoverable: true, + leaderRegionValid: true, + leaderAsyncReload: Some(true), + leaderSuccessReplica: []string{"z3"}, + leaderSuccessReadType: SuccessFollowerRead, + followerRegionValid: true, + followerAsyncReload: Some(true), + followerSuccessReplica: []string{"z3"}, + followerSuccessReadType: SuccessFollowerRead, + }, + } + tests := []func(*testRegionCacheStaleReadSuite, *RegionCacheTestCase){ + testStaleReadFollower, testStaleReadLeader, + } + for _, regionCacheTestCase := range regionCacheTestCases { + for _, test := range tests { + s := &testRegionCacheStaleReadSuite{ + Assertions: require.New(t), + } + s.SetupTest() + _, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + regionCacheTestCase.do(s) + for _, extra := range regionCacheTestCase.extra { + extra(s) + } + test(s, ®ionCacheTestCase) + s.TearDownTest() + } + } +} + +func testStaleReadFollower(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase) { + testStaleRead(s, r, "z2") +} + +func testStaleReadLeader(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase) { + testStaleRead(s, r, "z1") +} + +func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zone string) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + leaderZone := zone == "z1" + var available bool + if leaderZone { + available = len(r.leaderSuccessReplica) > 0 + } else { + available = len(r.followerSuccessReplica) > 0 + } + + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + + s.cache.mu.RLock() + region := s.cache.getRegionByIDFromCache(s.regionID) + s.cache.mu.RUnlock() + defer func() { + var ( + valid bool + asyncReload *bool + ) + if leaderZone { + valid = r.leaderRegionValid + asyncReload = r.leaderAsyncReload.Inner() + } else { + valid = r.followerRegionValid + asyncReload = r.followerAsyncReload.Inner() + } + s.Equal(valid, region.isValid()) + + if asyncReload == nil { + return + } + + s.cache.regionsNeedReload.Lock() + if *asyncReload { + s.Len(s.cache.regionsNeedReload.regions, 1) + s.Equal(s.cache.regionsNeedReload.regions[0], s.regionID) + } else { + s.Empty(s.cache.regionsNeedReload.regions) + } + s.cache.regionsNeedReload.Unlock() + }() + + bo := retry.NewBackoffer(ctx, -1) + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadMixed, nil) + req.EnableStaleRead() + ops := []StoreSelectorOption{WithMatchLabels([]*metapb.StoreLabel{{ + Key: "zone", + Value: zone, + }})} + + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + if !available { + if err != nil { + return + } + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.NotNil(regionErr) + return + } + + _, successZone, successReadType := s.extractResp(resp) + find := false + if leaderZone { + s.Equal(r.leaderSuccessReadType, successReadType) + for _, z := range r.leaderSuccessReplica { + if z == successZone { + find = true + break + } + } + } else { + s.Equal(r.followerSuccessReadType, successReadType) + for _, z := range r.followerSuccessReplica { + if z == successZone { + find = true + break + } + } + } + s.True(find) +} + +type Option[T interface{}] struct { + inner *T +} + +func Some[T interface{}](inner T) Option[T] { + return Option[T]{inner: &inner} +} + +func None[T interface{}]() Option[T] { + return Option[T]{inner: nil} +} + +func (o Option[T]) Inner() *T { + return o.inner +} + +type RegionCacheTestCase struct { + debug bool + do func(s *testRegionCacheStaleReadSuite) + extra []func(s *testRegionCacheStaleReadSuite) + recoverable bool + // local peer is leader + leaderRegionValid bool + leaderAsyncReload Option[bool] + leaderSuccessReplica []string + leaderSuccessReadType SuccessReadType + // local peer is follower + followerRegionValid bool + followerAsyncReload Option[bool] + followerSuccessReplica []string + followerSuccessReadType SuccessReadType +} + +func followerDown(s *testRegionCacheStaleReadSuite) { + _, follower := s.getFollower() + s.NotNil(follower) + s.setUnavailableStore(follower.Id) +} + +func followerDownAndUp(s *testRegionCacheStaleReadSuite) { + s.cache.mu.RLock() + cachedRegion := s.cache.getRegionByIDFromCache(s.regionID) + s.cache.mu.RUnlock() + _, follower := s.getFollower() + s.NotNil(cachedRegion) + s.NotNil(follower) + regionStore := cachedRegion.getStore() + for _, storeIdx := range regionStore.accessIndex[tiKVOnly] { + if regionStore.stores[storeIdx].storeID == follower.Id { + atomic.AddUint32(®ionStore.stores[storeIdx].epoch, 1) + } + } +} + +func followerMove(s *testRegionCacheStaleReadSuite) { + peerID, follower := s.getFollower() + zone := "" + for _, label := range follower.Labels { + if label.Key == "zone" { + zone = label.Value + break + } + } + s.NotEqual("", zone) + var target *metapb.Store +FIND: + for _, store := range s.cluster.GetAllStores() { + if store.Id == follower.Id { + continue + } + for _, label := range store.Labels { + if label.Key == "zone" && label.Value == zone { + target = store + break FIND + } + } + } + s.NotNil(target) + s.cluster.RemovePeer(s.regionID, peerID) + s.cluster.AddPeer(s.regionID, target.Id, peerID) +} + +func evictLeader(s *testRegionCacheStaleReadSuite) { + region, leader := s.cluster.GetRegion(s.regionID) + for _, peer := range region.Peers { + if peer.Id != leader { + s.cluster.ChangeLeader(s.regionID, peer.Id) + return + } + } + s.Fail("unreachable") +} + +func leaderMove(s *testRegionCacheStaleReadSuite) { + peerID, leader := s.getLeader() + zone := "" + for _, label := range leader.Labels { + if label.Key == "zone" { + zone = label.Value + break + } + } + s.NotEqual("", zone) + var target *metapb.Store +FIND: + for _, store := range s.cluster.GetAllStores() { + if store.Id == leader.Id { + continue + } + for _, label := range store.Labels { + if label.Key == "zone" && label.Value == zone { + target = store + break FIND + } + } + } + s.NotNil(target) + s.cluster.RemovePeer(s.regionID, peerID) + s.cluster.AddPeer(s.regionID, target.Id, peerID) + s.cluster.ChangeLeader(s.regionID, peerID) +} + +func leaderDown(s *testRegionCacheStaleReadSuite) { + _, leader := s.getLeader() + s.NotNil(leader) + s.setUnavailableStore(leader.Id) +} + +func leaderDownAndUp(s *testRegionCacheStaleReadSuite) { + s.cache.mu.RLock() + cachedRegion := s.cache.getRegionByIDFromCache(s.regionID) + s.cache.mu.RUnlock() + _, leader := s.getLeader() + s.NotNil(cachedRegion) + s.NotNil(leader) + regionStore := cachedRegion.getStore() + for _, storeIdx := range regionStore.accessIndex[tiKVOnly] { + if regionStore.stores[storeIdx].storeID == leader.Id { + atomic.AddUint32(®ionStore.stores[storeIdx].epoch, 1) + } + } +} +func leaderDownAndElect(s *testRegionCacheStaleReadSuite) { + _, leader := s.getLeader() + s.NotNil(leader) + leaderMove(s) + s.setUnavailableStore(leader.Id) +} + +func leaderDataIsNotReady(s *testRegionCacheStaleReadSuite) { + peerID, _ := s.getLeader() + s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error { + if !req.StaleRead || zone != "z1" { + return nil + } + return &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{ + RegionId: s.regionID, + PeerId: peerID, + SafeTs: 0, + }, + } + } +} + +func leaderServerIsBusy(s *testRegionCacheStaleReadSuite) { + s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error { + if zone != "z1" { + return nil + } + return &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{ + Reason: "test", + BackoffMs: 1, + }, + } + } +} + +func followerDataIsNotReady(s *testRegionCacheStaleReadSuite) { + s.injection.followerRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error { + if !req.StaleRead || zone != "z2" { + return nil + } + return &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{ + RegionId: s.regionID, + SafeTs: 0, + }, + } + } +} + +func followerServerIsBusy(s *testRegionCacheStaleReadSuite) { + s.injection.followerRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error { + if zone != "z2" { + return nil + } + return &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{ + Reason: "test", + BackoffMs: 1, + }, + } + } +} diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 7f52d34cd..42c8ceb34 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -608,7 +608,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) @@ -618,7 +618,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v3 := region.Region.confVer + 1 r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}} st = &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index 6a65ab68b..fc8af00d2 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -393,11 +393,11 @@ func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) { // RemovePeer removes the Peer from the Region. Note that if the Peer is leader, // the Region will have no leader before calling ChangeLeader(). -func (c *Cluster) RemovePeer(regionID, storeID uint64) { +func (c *Cluster) RemovePeer(regionID, peerID uint64) { c.Lock() defer c.Unlock() - c.regions[regionID].removePeer(storeID) + c.regions[regionID].removePeer(peerID) } // ChangeLeader sets the Region's leader Peer. Caller should guarantee the Peer diff --git a/internal/mockstore/mocktikv/cluster_manipulate.go b/internal/mockstore/mocktikv/cluster_manipulate.go index ae02beeb5..d688cde4b 100644 --- a/internal/mockstore/mocktikv/cluster_manipulate.go +++ b/internal/mockstore/mocktikv/cluster_manipulate.go @@ -80,3 +80,29 @@ func BootstrapWithMultiRegions(cluster *Cluster, splitKeys ...[]byte) (storeID u } return } + +// BootstrapWithMultiZones initializes a Cluster with 1 Region and n Zones and m Stores in each Zone. +func BootstrapWithMultiZones(cluster *Cluster, n, m int) (storeIDs, peerIDs []uint64, regionID uint64, leaderPeer uint64, store2zone map[uint64]string) { + storeIDs = cluster.AllocIDs(n * m) + peerIDs = cluster.AllocIDs(n) + leaderPeer = peerIDs[0] + regionID = cluster.AllocID() + store2zone = make(map[uint64]string, n*m) + for id, storeID := range storeIDs { + zone := fmt.Sprintf("z%d", (id%n)+1) + store2zone[storeID] = zone + labels := []*metapb.StoreLabel{ + { + Key: "id", + Value: fmt.Sprintf("%v", storeID), + }, + { + Key: "zone", + Value: zone, + }, + } + cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID), labels...) + } + cluster.Bootstrap(regionID, storeIDs[:n], peerIDs, leaderPeer) + return +}