diff --git a/config/config.go b/config/config.go index d0273fb3d1b88..11a5ef882aa88 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,8 @@ const ( DefPort = 4000 // DefStatusPort is the default status port of TiBD DefStatusPort = 10080 + // DefStoreLivenessTimeout is the default value for store liveness timeout. + DefStoreLivenessTimeout = "120s" ) // Valid config maps @@ -449,6 +451,8 @@ type TiKVClient struct { // If a store has been up to the limit, it will return error for successive request to // prevent the store occupying too much token in dispatching level. StoreLimit int64 `toml:"store-limit" json:"store-limit"` + // StoreLivenessTimeout is the timeout for store liveness check request. + StoreLivenessTimeout string `toml:"store-liveness-timeout" json:"store-liveness-timeout"` CoprCache CoprocessorCache `toml:"copr-cache" json:"copr-cache"` } @@ -627,8 +631,9 @@ var defaultConf = Config{ EnableChunkRPC: true, - RegionCacheTTL: 600, - StoreLimit: 0, + RegionCacheTTL: 600, + StoreLimit: 0, + StoreLivenessTimeout: DefStoreLivenessTimeout, CoprCache: CoprocessorCache{ Enabled: true, diff --git a/config/config.toml.example b/config/config.toml.example index fafaa390561d5..52f99090a0749 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -348,6 +348,9 @@ region-cache-ttl = 600 # default 0 means shutting off store limit. store-limit = 0 +# store-liveness-timeout is used to control timeout for store liveness after sending request failed. +store-liveness-timeout = "120s" + [tikv-client.copr-cache] # Whether to enable the copr cache. The copr cache saves the result from TiKV Coprocessor in the memory and # reuses the result when corresponding data in TiKV is unchanged, on a region basis. diff --git a/metrics/metrics.go b/metrics/metrics.go index 6e7571f7fb642..cd9509486511b 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -149,6 +149,8 @@ func RegisterMetrics() { prometheus.MustRegister(TotalCopProcHistogram) prometheus.MustRegister(TotalCopWaitHistogram) prometheus.MustRegister(TiKVPendingBatchRequests) + prometheus.MustRegister(TiKVStatusDuration) + prometheus.MustRegister(TiKVStatusCounter) prometheus.MustRegister(TiKVBatchWaitDuration) prometheus.MustRegister(TiKVBatchClientUnavailable) prometheus.MustRegister(TiKVRangeTaskStats) diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index e23a6daa309da..86a29401afaf6 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -156,6 +156,23 @@ var ( Help: "Pending batch requests", }, []string{"store"}) + TiKVStatusDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "kv_status_api_duration", + Help: "duration for kv status api.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), // 0.5ms ~ 262s + }, []string{"store"}) + + TiKVStatusCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "kv_status_api_count", + Help: "Counter of access kv status api.", + }, []string{LblResult}) + TiKVBatchWaitDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "tidb", diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 07d8a92791426..5a372446f3c60 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "fmt" + "net/http" "sync" "sync/atomic" "time" @@ -28,11 +29,14 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" atomic2 "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/sync/singleflight" ) const ( @@ -56,6 +60,9 @@ var ( tikvRegionCacheCounterWithGetStoreOK = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "ok") tikvRegionCacheCounterWithGetStoreError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "err") tikvRegionCacheCounterWithInvalidateStoreRegionsOK = metrics.TiKVRegionCacheCounter.WithLabelValues("invalidate_store_regions", "ok") + + tikvStatusCountWithOK = metrics.TiKVStatusCounter.WithLabelValues("ok") + tikvStatusCountWithError = metrics.TiKVStatusCounter.WithLabelValues("err") ) const ( @@ -77,18 +84,18 @@ type RegionStore struct { workTiKVIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tikv peer workTiFlashIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tiflash peer stores []*Store // stores in this region - storeFails []uint32 // snapshots of store's fail, need reload when `storeFails[curr] != stores[cur].fail` + storeEpochs []uint32 // snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail` } // clone clones region store struct. func (r *RegionStore) clone() *RegionStore { - storeFails := make([]uint32, len(r.stores)) - copy(storeFails, r.storeFails) + storeEpochs := make([]uint32, len(r.stores)) + copy(storeEpochs, r.storeEpochs) return &RegionStore{ workTiFlashIdx: r.workTiFlashIdx, workTiKVIdx: r.workTiKVIdx, stores: r.stores, - storeFails: storeFails, + storeEpochs: storeEpochs, } } @@ -107,7 +114,7 @@ func (r *RegionStore) follower(seed uint32) int32 { if r.stores[followerIdx].storeType != kv.TiKV { continue } - if r.storeFails[followerIdx] == atomic.LoadUint32(&r.stores[followerIdx].fail) { + if r.storeEpochs[followerIdx] == atomic.LoadUint32(&r.stores[followerIdx].epoch) { return followerIdx } seed++ @@ -122,7 +129,7 @@ func (r *RegionStore) peer(seed uint32) int32 { if r.stores[i].storeType != kv.TiKV { continue } - if r.storeFails[i] != atomic.LoadUint32(&r.stores[i].fail) { + if r.storeEpochs[i] != atomic.LoadUint32(&r.stores[i].epoch) { continue } candidates = append(candidates, int32(i)) @@ -142,7 +149,7 @@ func (r *Region) init(c *RegionCache) { workTiKVIdx: 0, workTiFlashIdx: 0, stores: make([]*Store, 0, len(r.meta.Peers)), - storeFails: make([]uint32, 0, len(r.meta.Peers)), + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), } for _, p := range r.meta.Peers { c.storeMu.RLock() @@ -152,7 +159,7 @@ func (r *Region) init(c *RegionCache) { store = c.getStoreByStoreID(p.StoreId) } rs.stores = append(rs.stores, store) - rs.storeFails = append(rs.storeFails, atomic.LoadUint32(&store.fail)) + rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch)) } atomic.StorePointer(&r.store, unsafe.Pointer(rs)) @@ -346,8 +353,8 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe return nil, nil } - storeFailEpoch := atomic.LoadUint32(&store.fail) - if storeFailEpoch != regionStore.storeFails[storeIdx] { + storeFailEpoch := atomic.LoadUint32(&store.epoch) + if storeFailEpoch != regionStore.storeEpochs[storeIdx] { cachedRegion.invalidate() logutil.BgLogger().Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), @@ -401,8 +408,8 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC } atomic.StoreInt32(®ionStore.workTiFlashIdx, int32(storeIdx)) peer := cachedRegion.meta.Peers[storeIdx] - storeFailEpoch := atomic.LoadUint32(&store.fail) - if storeFailEpoch != regionStore.storeFails[storeIdx] { + storeFailEpoch := atomic.LoadUint32(&store.epoch) + if storeFailEpoch != regionStore.storeEpochs[storeIdx] { cachedRegion.invalidate() logutil.BgLogger().Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), @@ -511,11 +518,34 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload tikvRegionCacheCounterWithSendFail.Inc() r := c.getCachedRegionWithRLock(ctx.Region) if r != nil { + rs := r.getStore() + if err != nil { + s := rs.stores[ctx.PeerIdx] + + // send fail but store is reachable, keep retry current peer. + if s.requestLiveness(bo) == reachable { + return + } + + // invalidate regions in store. + epoch := rs.storeEpochs[ctx.PeerIdx] + if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) { + logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) + tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() + } + + // schedule a store addr resolve. + s.markNeedCheck(c.notifyCheckCh) + } + + // try next peer to found new leader. if ctx.Store.storeType == kv.TiKV { - c.switchNextPeer(r, ctx.PeerIdx, err) + rs.switchNextPeer(r, ctx.PeerIdx) } else { - c.switchNextFlashPeer(r, ctx.PeerIdx, err) + rs.switchNextFlashPeer(r, ctx.PeerIdx) } + + // force reload region when retry all known peers in region. if scheduleReload { r.scheduleReload() } @@ -724,7 +754,8 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c } if leaderStoreID == 0 { - c.switchNextPeer(r, currentPeerIdx, nil) + rs := r.getStore() + rs.switchNextPeer(r, currentPeerIdx) logutil.BgLogger().Info("switch region peer to next due to NotLeader with NULL leader", zap.Int("currIdx", currentPeerIdx), zap.Uint64("regionID", regionID.GetID())) @@ -1189,49 +1220,25 @@ func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool) return } -func (c *RegionCache) switchNextFlashPeer(r *Region, currentPeerIdx int, err error) { - rs := r.getStore() - - if err != nil { // TODO: refine err, only do this for some errors. - s := rs.stores[currentPeerIdx] - epoch := rs.storeFails[currentPeerIdx] - if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) { - logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) - tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() - } - s.markNeedCheck(c.notifyCheckCh) - } - - nextIdx := (currentPeerIdx + 1) % len(rs.stores) - newRegionStore := rs.clone() +func (r *RegionStore) switchNextFlashPeer(rr *Region, currentPeerIdx int) { + nextIdx := (currentPeerIdx + 1) % len(r.stores) + newRegionStore := r.clone() newRegionStore.workTiFlashIdx = int32(nextIdx) - r.compareAndSwapStore(rs, newRegionStore) + rr.compareAndSwapStore(r, newRegionStore) } -func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) { - rs := r.getStore() - - if err != nil { // TODO: refine err, only do this for some errors. - s := rs.stores[currentPeerIdx] - epoch := rs.storeFails[currentPeerIdx] - if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) { - logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) - tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() - } - s.markNeedCheck(c.notifyCheckCh) - } - - if int(rs.workTiKVIdx) != currentPeerIdx { +func (r *RegionStore) switchNextPeer(rr *Region, currentPeerIdx int) { + if int(r.workTiKVIdx) != currentPeerIdx { return } - nextIdx := (currentPeerIdx + 1) % len(rs.stores) - for rs.stores[nextIdx].storeType == kv.TiFlash { - nextIdx = (nextIdx + 1) % len(rs.stores) + nextIdx := (currentPeerIdx + 1) % len(r.stores) + for r.stores[nextIdx].storeType == kv.TiFlash { + nextIdx = (nextIdx + 1) % len(r.stores) } - newRegionStore := rs.clone() + newRegionStore := r.clone() newRegionStore.workTiKVIdx = int32(nextIdx) - r.compareAndSwapStore(rs, newRegionStore) + rr.compareAndSwapStore(r, newRegionStore) } func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) { @@ -1280,10 +1287,11 @@ func (r *Region) ContainsByEnd(key []byte) bool { // Store contains a kv process's address. type Store struct { addr string // loaded store address + saddr string // loaded store status address storeID uint64 // store's id state uint64 // unsafe store storeState resolveMutex sync.Mutex // protect pd from concurrent init requests - fail uint32 // store fail count, see RegionStore.storeFails + epoch uint32 // store fail epoch, see RegionStore.storeEpochs storeType kv.StoreType // type of the store tokenCount atomic2.Int64 // used store token count } @@ -1330,6 +1338,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err } addr = store.GetAddress() s.addr = addr + s.saddr = store.GetStatusAddress() s.storeType = GetStoreTypeByMeta(store) retry: state = s.getResolveState() @@ -1376,7 +1385,7 @@ func (s *Store) reResolve(c *RegionCache) { // store has be removed in PD, we should invalidate all regions using those store. logutil.BgLogger().Info("invalidate regions in removed store", zap.Uint64("store", s.storeID), zap.String("add", s.addr)) - atomic.AddUint32(&s.fail, 1) + atomic.AddUint32(&s.epoch, 1) tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() return } @@ -1385,8 +1394,8 @@ func (s *Store) reResolve(c *RegionCache) { addr = store.GetAddress() if s.addr != addr { state := resolved - newStore := &Store{storeID: s.storeID, addr: addr, storeType: storeType} - newStore.state = *(*uint64)(unsafe.Pointer(&state)) + newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType} + newStore.state = *(*uint64)(&state) c.storeMu.Lock() c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() @@ -1441,3 +1450,91 @@ retry: } } + +type livenessState uint32 + +var ( + livenessSf singleflight.Group + storeLivenessTimeout time.Duration +) + +const ( + unknown livenessState = iota + reachable + unreachable + offline +) + +func init() { + t, err := time.ParseDuration(config.GetGlobalConfig().TiKVClient.StoreLivenessTimeout) + if err != nil { + logutil.BgLogger().Fatal("invalid duration value for store-liveness-timeout", + zap.String("currentValue", config.GetGlobalConfig().TiKVClient.StoreLivenessTimeout)) + } + storeLivenessTimeout = t +} + +func (s *Store) requestLiveness(bo *Backoffer) (l livenessState) { + saddr := s.saddr + if len(saddr) == 0 { + l = unknown + return + } + rsCh := livenessSf.DoChan(saddr, func() (interface{}, error) { + return invokeKVStatusAPI(saddr, storeLivenessTimeout), nil + }) + var ctx context.Context + if bo != nil { + ctx = bo.ctx + } else { + ctx = context.Background() + } + select { + case rs := <-rsCh: + l = rs.Val.(livenessState) + case <-ctx.Done(): + l = unknown + return + } + return +} + +func invokeKVStatusAPI(saddr string, timeout time.Duration) (l livenessState) { + start := time.Now() + defer func() { + if l == reachable { + tikvStatusCountWithOK.Inc() + } else { + tikvStatusCountWithError.Inc() + } + metrics.TiKVStatusDuration.WithLabelValues(saddr).Observe(time.Since(start).Seconds()) + }() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + url := fmt.Sprintf("%s://%s/status", util.InternalHTTPSchema(), saddr) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + logutil.BgLogger().Info("[liveness] build kv status request fail", zap.String("store", saddr), zap.Error(err)) + l = unreachable + return + } + resp, err := util.InternalHTTPClient().Do(req) + if err != nil { + logutil.BgLogger().Info("[liveness] request kv status fail", zap.String("store", saddr), zap.Error(err)) + l = unreachable + return + } + defer func() { + err1 := resp.Body.Close() + if err1 != nil { + logutil.BgLogger().Debug("[liveness] close kv status api body failed", zap.String("store", saddr), zap.Error(err)) + } + }() + if resp.StatusCode != http.StatusOK { + logutil.BgLogger().Info("[liveness] request kv status fail", zap.String("store", saddr), zap.String("status", resp.Status)) + l = unreachable + return + } + l = reachable + return +} diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 2d52bdb13f402..06cdca6d56cdb 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -1139,8 +1139,8 @@ func BenchmarkOnRequestFail(b *testing.B) { Store: store, } r := cache.getCachedRegionWithRLock(rpcCtx.Region) - if r == nil { - cache.switchNextPeer(r, rpcCtx.PeerIdx, nil) + if r != nil { + r.getStore().switchNextPeer(r, rpcCtx.PeerIdx) } } })