diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index dea0c98148aa0..b12ced3f65c12 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -15,7 +15,6 @@ package copr import ( "context" - "sync/atomic" "time" "github.com/pingcap/errors" @@ -80,7 +79,7 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { return errors.Trace(err) - } else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 { + } else if tikv.LoadShuttingDown() > 0 { return tikverr.ErrTiDBShuttingDown } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 2fb006138dff5..50eb437545024 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -55,8 +55,13 @@ const ( defaultRegionsPerBatch = 128 ) -// RegionCacheTTLSec is the max idle time for regions in the region cache. -var RegionCacheTTLSec int64 = 600 +// regionCacheTTLSec is the max idle time for regions in the region cache. +var regionCacheTTLSec int64 = 600 + +// SetRegionCacheTTLSec sets regionCacheTTLSec to t. +func SetRegionCacheTTLSec(t int64) { + regionCacheTTLSec = t +} const ( updated int32 = iota // region is updated and no need to reload. @@ -254,7 +259,7 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool { }) for { lastAccess := atomic.LoadInt64(&r.lastAccess) - if ts-lastAccess > RegionCacheTTLSec { + if ts-lastAccess > regionCacheTTLSec { return false } if atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) { @@ -1180,7 +1185,7 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { return nil } lastAccess := atomic.LoadInt64(&latestRegion.lastAccess) - if ts-lastAccess > RegionCacheTTLSec { + if ts-lastAccess > regionCacheTTLSec { return nil } if latestRegion != nil { @@ -2070,10 +2075,20 @@ type livenessState uint32 var ( livenessSf singleflight.Group - // StoreLivenessTimeout is the max duration of resolving liveness of a TiKV instance. - StoreLivenessTimeout time.Duration + // storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance. + storeLivenessTimeout time.Duration ) +// SetStoreLivenessTimeout sets storeLivenessTimeout to t. +func SetStoreLivenessTimeout(t time.Duration) { + storeLivenessTimeout = t +} + +// GetStoreLivenessTimeout returns storeLivenessTimeout. +func GetStoreLivenessTimeout() time.Duration { + return storeLivenessTimeout +} + const ( unknown livenessState = iota reachable @@ -2136,7 +2151,7 @@ func (s *Store) requestLiveness(bo *Backoffer, c *RegionCache) (l livenessState) return c.testingKnobs.mockRequestLiveness(s, bo) } - if StoreLivenessTimeout == 0 { + if storeLivenessTimeout == 0 { return unreachable } @@ -2146,7 +2161,7 @@ func (s *Store) requestLiveness(bo *Backoffer, c *RegionCache) (l livenessState) } addr := s.addr rsCh := livenessSf.DoChan(addr, func() (interface{}, error) { - return invokeKVStatusAPI(addr, StoreLivenessTimeout), nil + return invokeKVStatusAPI(addr, storeLivenessTimeout), nil }) var ctx context.Context if bo != nil { diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index b8fcc837c1bc6..a90d638233578 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -44,8 +44,19 @@ import ( // ShuttingDown is a flag to indicate tidb-server is exiting (Ctrl+C signal // receved for example). If this flag is set, tikv client should not retry on // network error because tidb-server expect tikv client to exit as soon as possible. +// TODO: make it private when br is ready. var ShuttingDown uint32 +// StoreShuttingDown atomically stores ShuttingDown into v. +func StoreShuttingDown(v uint32) { + atomic.StoreUint32(&ShuttingDown, v) +} + +// LoadShuttingDown atomically loads ShuttingDown. +func LoadShuttingDown() uint32 { + return atomic.LoadUint32(&ShuttingDown) +} + // RegionRequestSender sends KV/Cop requests to tikv server. It handles network // errors and some region errors internally. // @@ -555,7 +566,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled { return errors.Trace(err) - } else if atomic.LoadUint32(&ShuttingDown) > 0 { + } else if LoadShuttingDown() > 0 { return tikverr.ErrTiDBShuttingDown } if status.Code(errors.Cause(err)) == codes.Canceled { diff --git a/tidb-server/main.go b/tidb-server/main.go index f1b16397046f9..f99722e8bfd13 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -562,7 +562,7 @@ func setGlobalVars() { } atomic.StoreUint64(&tikv.CommitMaxBackoff, uint64(parseDuration(cfg.TiKVClient.CommitTimeout).Seconds()*1000)) - tikv.RegionCacheTTLSec = int64(cfg.TiKVClient.RegionCacheTTL) + tikv.SetRegionCacheTTLSec(int64(cfg.TiKVClient.RegionCacheTTL)) domainutil.RepairInfo.SetRepairMode(cfg.RepairMode) domainutil.RepairInfo.SetRepairTableList(cfg.RepairTableList) executor.GlobalDiskUsageTracker.SetBytesLimit(cfg.TempStorageQuota) @@ -579,7 +579,7 @@ func setGlobalVars() { logutil.BgLogger().Fatal("invalid duration value for store-liveness-timeout", zap.String("currentValue", cfg.TiKVClient.StoreLivenessTimeout)) } - tikv.StoreLivenessTimeout = t + tikv.SetStoreLivenessTimeout(t) parsertypes.TiDBStrictIntegerDisplayWidth = cfg.DeprecateIntegerDisplayWidth } @@ -649,7 +649,7 @@ func setupTracing() { } func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) { - atomic.StoreUint32(&tikv.ShuttingDown, 1) + tikv.StoreShuttingDown(1) dom.Close() err := storage.Close() terror.Log(errors.Trace(err))