Skip to content

Commit

Permalink
introduce a random jitter to region cache ttl (#1148)
Browse files Browse the repository at this point in the history
* introduce a random jitter to region cache ttl

Signed-off-by: zyguan <[email protected]>

* refactor searching cached region

Signed-off-by: zyguan <[email protected]>

* observe load region by reason

Signed-off-by: zyguan <[email protected]>

* address the comment

Signed-off-by: zyguan <[email protected]>

---------

Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan authored Feb 2, 2024
1 parent 70c148e commit 8b3b01e
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 118 deletions.
218 changes: 154 additions & 64 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
Expand All @@ -81,9 +82,9 @@ import (
)

const (
btreeDegree = 32
invalidatedLastAccessTime = -1
defaultRegionsPerBatch = 128
btreeDegree = 32
expiredTTL = -1
defaultRegionsPerBatch = 128
)

// LabelFilter returns false means label doesn't match, and will ignore this store.
Expand Down Expand Up @@ -121,6 +122,29 @@ func SetRegionCacheTTLSec(t int64) {
regionCacheTTLSec = t
}

// regionCacheTTLJitterSec is the max jitter time for region cache TTL.
var regionCacheTTLJitterSec int64 = 60

// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter).
func SetRegionCacheTTLWithJitter(base int64, jitter int64) {
regionCacheTTLSec = base
regionCacheTTLJitterSec = jitter
}

// nextTTL returns a random TTL in range [ts+base, ts+base+jitter). The input ts should be an epoch timestamp in seconds.
func nextTTL(ts int64) int64 {
jitter := int64(0)
if regionCacheTTLJitterSec > 0 {
jitter = rand.Int63n(regionCacheTTLJitterSec)
}
return ts + regionCacheTTLSec + jitter
}

// nextTTLWithoutJitter is used for test.
func nextTTLWithoutJitter(ts int64) int64 {
return ts + regionCacheTTLSec
}

const (
needReloadOnAccess int32 = 1 << iota // indicates the region will be reloaded on next access
needExpireAfterTTL // indicates the region will expire after RegionCacheTTL (even when it's accessed continuously)
Expand Down Expand Up @@ -150,11 +174,30 @@ const (
Other
)

func (r InvalidReason) String() string {
switch r {
case Ok:
return "Ok"
case Other:
return "Other"
case EpochNotMatch:
return "EpochNotMatch"
case RegionNotFound:
return "RegionNotFound"
case StoreNotFound:
return "StoreNotFound"
case NoLeader:
return "NoLeader"
default:
return "Unknown"
}
}

// Region presents kv region
type Region struct {
meta *metapb.Region // raw region meta from PD, immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
lastAccess int64 // last region access time, see checkRegionCacheTTL
ttl int64 // region TTL in epoch seconds, see checkRegionCacheTTL
syncFlags int32 // region need be sync later, see needReloadOnAccess, needExpireAfterTTL
invalidReason InvalidReason // the reason why the region is invalidated
}
Expand Down Expand Up @@ -338,7 +381,7 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
}

// mark region has been init accessed.
r.lastAccess = time.Now().Unix()
r.ttl = nextTTL(time.Now().Unix())
return r, nil
}

Expand All @@ -356,8 +399,7 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool {
}

func (r *Region) isCacheTTLExpired(ts int64) bool {
lastAccess := atomic.LoadInt64(&r.lastAccess)
return ts-lastAccess > regionCacheTTLSec
return ts > atomic.LoadInt64(&r.ttl)
}

// checkRegionCacheTTL returns false means the region cache is expired.
Expand All @@ -366,28 +408,36 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool {
if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil {
r.invalidate(Other)
}
newTTL := int64(0)
for {
lastAccess := atomic.LoadInt64(&r.lastAccess)
if ts-lastAccess > regionCacheTTLSec {
ttl := atomic.LoadInt64(&r.ttl)
if ts > ttl {
return false
}
if r.checkSyncFlags(needExpireAfterTTL) || atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) {
// skip updating TTL when:
// 1. the region has been marked as `needExpireAfterTTL`
// 2. the TTL is far away from ts (still within jitter time)
if r.checkSyncFlags(needExpireAfterTTL) || ttl > ts+regionCacheTTLSec {
return true
}
if newTTL == 0 {
newTTL = nextTTL(ts)
}
// now we have ts <= ttl <= ts+regionCacheTTLSec <= newTTL = ts+regionCacheTTLSec+randomJitter
if atomic.CompareAndSwapInt64(&r.ttl, ttl, newTTL) {
return true
}
}
}

// invalidate invalidates a region, next time it will got null result.
func (r *Region) invalidate(reason InvalidReason) {
metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc()
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
}

// invalidateWithoutMetrics invalidates a region without metrics, next time it will got null result.
func (r *Region) invalidateWithoutMetrics(reason InvalidReason) {
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
func (r *Region) invalidate(reason InvalidReason, nocount ...bool) {
if atomic.CompareAndSwapInt32((*int32)(&r.invalidReason), int32(Ok), int32(reason)) {
if len(nocount) == 0 || !nocount[0] {
metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc()
}
atomic.StoreInt64(&r.ttl, expiredTTL)
}
}

func (r *Region) getSyncFlags() int32 {
Expand Down Expand Up @@ -1068,9 +1118,15 @@ func (c *RegionCache) LocateEndKey(bo *retry.Backoffer, key []byte) (*KeyLocatio
}

func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey bool) (r *Region, err error) {
r = c.searchCachedRegion(key, isEndKey)
if r == nil {
var expired bool
r, expired = c.searchCachedRegionByKey(key, isEndKey)
tag := "ByKey"
if isEndKey {
tag = "ByEndKey"
}
if r == nil || expired {
// load region when it is not exists or expired.
observeLoadRegion(tag, r, expired, 0)
lr, err := c.loadRegion(bo, key, isEndKey, pd.WithAllowFollowerHandle())
if err != nil {
// no region data, return error if failure.
Expand All @@ -1083,6 +1139,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
c.mu.Unlock()
// just retry once, it won't bring much overhead.
if stale {
observeLoadRegion(tag+":Retry", r, expired, 0)
lr, err = c.loadRegion(bo, key, isEndKey)
if err != nil {
// no region data, return error if failure.
Expand All @@ -1101,8 +1158,10 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
err error
)
if reloadOnAccess {
observeLoadRegion(tag, r, expired, flags)
lr, err = c.loadRegion(bo, key, isEndKey)
} else {
observeLoadRegion("ByID", r, expired, flags)
lr, err = c.loadRegionByID(bo, r.GetID())
}
if err != nil {
Expand All @@ -1122,8 +1181,9 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
}

func (c *RegionCache) tryFindRegionByKey(key []byte, isEndKey bool) (r *Region) {
r = c.searchCachedRegion(key, isEndKey)
if r == nil || r.checkSyncFlags(needReloadOnAccess) {
var expired bool
r, expired = c.searchCachedRegionByKey(key, isEndKey)
if r == nil || expired || r.checkSyncFlags(needReloadOnAccess) {
return nil
}
return r
Expand Down Expand Up @@ -1242,12 +1302,11 @@ func (c *RegionCache) OnSendFail(bo *retry.Backoffer, ctx *RPCContext, scheduleR

// LocateRegionByID searches for the region with ID.
func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*KeyLocation, error) {
c.mu.RLock()
r := c.getRegionByIDFromCache(regionID)
c.mu.RUnlock()
if r != nil {
if flags := r.resetSyncFlags(needReloadOnAccess); flags > 0 {
r, expired := c.searchCachedRegionByID(regionID)
if r != nil && !expired {
if flags := r.resetSyncFlags(needReloadOnAccess | needDelayedReloadReady); flags > 0 {
reloadOnAccess := flags&needReloadOnAccess > 0
observeLoadRegion("ByID", r, expired, flags)
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
Expand All @@ -1269,6 +1328,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
return loc, nil
}

observeLoadRegion("ByID", r, expired, 0)
r, err := c.loadRegionByID(bo, regionID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1500,11 +1560,7 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
if shouldCount {
region.cachedRegion.invalidate(Other)
} else {
region.cachedRegion.invalidateWithoutMetrics(Other)
}
region.cachedRegion.invalidate(Other, !shouldCount)
}
}
// update related vars.
Expand All @@ -1513,47 +1569,33 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
return true
}

// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// it should be called with c.mu.RLock(), and the returned Region should not be
// used after c.mu is RUnlock().
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
// when processing in reverse order.
func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region {
ts := time.Now().Unix()
var r *Region
// searchCachedRegionByKey finds the region from cache by key.
func (c *RegionCache) searchCachedRegionByKey(key []byte, isEndKey bool) (*Region, bool) {
c.mu.RLock()
r = c.mu.sorted.DescendLessOrEqual(key, isEndKey, ts)
region := c.mu.sorted.SearchByKey(key, isEndKey)
c.mu.RUnlock()
if r != nil && (!isEndKey && r.Contains(key) || isEndKey && r.ContainsByEnd(key)) {
return r
if region == nil {
return nil, false
}
return nil
return region, !region.checkRegionCacheTTL(time.Now().Unix())
}

// getRegionByIDFromCache tries to get region by regionID from cache. Like
// `getCachedRegion`, it should be called with c.mu.RLock(), and the returned
// Region should not be used after c.mu is RUnlock().
func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
ts := time.Now().Unix()
// searchCachedRegionByID finds the region from cache by id.
func (c *RegionCache) searchCachedRegionByID(regionID uint64) (*Region, bool) {
c.mu.RLock()
ver, ok := c.mu.latestVersions[regionID]
if !ok {
return nil
c.mu.RUnlock()
return nil, false
}
latestRegion, ok := c.mu.regions[ver]
region, ok := c.mu.regions[ver]
c.mu.RUnlock()
if !ok {
// should not happen
logutil.BgLogger().Warn("region version not found",
zap.Uint64("regionID", regionID), zap.Stringer("version", &ver))
return nil
}
lastAccess := atomic.LoadInt64(&latestRegion.lastAccess)
if ts-lastAccess > regionCacheTTLSec {
return nil
}
if !latestRegion.checkSyncFlags(needExpireAfterTTL) {
atomic.CompareAndSwapInt64(&latestRegion.lastAccess, atomic.LoadInt64(&latestRegion.lastAccess), ts)
logutil.BgLogger().Warn("region not found", zap.Uint64("id", regionID), zap.Stringer("ver", &ver))
return nil, false
}
return latestRegion
return region, !region.checkRegionCacheTTL(time.Now().Unix())
}

// GetStoresByType gets stores by type `typ`
Expand All @@ -1570,6 +1612,53 @@ func (c *RegionCache) GetAllStores() []*Store {
})
}

var loadRegionCounters sync.Map

const (
loadRegionReasonMissing = "Missing"
loadRegionReasonExpiredNormal = "Expired:Normal"
loadRegionReasonExpiredFrozen = "Expired:Frozen"
loadRegionReasonExpiredInvalid = "Expired:Invalid:"
loadRegionReasonReloadOnAccess = "Reload:OnAccess"
loadRegionReasonReloadDelayed = "Reload:Delayed"
loadRegionReasonUpdateBuckets = "UpdateBuckets"
loadRegionReasonUnknown = "Unknown"
)

func observeLoadRegion(tag string, region *Region, expired bool, reloadFlags int32, explicitReason ...string) {
reason := loadRegionReasonUnknown
if len(explicitReason) > 0 {
reason = strings.Join(explicitReason, ":")
} else if region == nil {
reason = loadRegionReasonMissing
} else if expired {
invalidReason := InvalidReason(atomic.LoadInt32((*int32)(&region.invalidReason)))
if invalidReason != Ok {
reason = loadRegionReasonExpiredInvalid + invalidReason.String()
} else if region.checkSyncFlags(needExpireAfterTTL) {
reason = loadRegionReasonExpiredFrozen
} else {
reason = loadRegionReasonExpiredNormal
}
} else if reloadFlags > 0 {
if reloadFlags&needReloadOnAccess > 0 {
reason = loadRegionReasonReloadOnAccess
} else if reloadFlags&needDelayedReloadReady > 0 {
reason = loadRegionReasonReloadDelayed
}
}
type key struct {
t string
r string
}
counter, ok := loadRegionCounters.Load(key{tag, reason})
if !ok {
counter = metrics.TiKVLoadRegionCounter.WithLabelValues(tag, reason)
loadRegionCounters.Store(key{tag, reason}, counter)
}
counter.(prometheus.Counter).Inc()
}

// loadRegion loads region from pd client, and picks the first peer as leader.
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
// when processing in reverse order.
Expand Down Expand Up @@ -2062,6 +2151,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
// TODO(youjiali1995): use singleflight.
go func() {
bo := retry.NewBackoffer(context.Background(), 20000)
observeLoadRegion("ByID", r, false, 0, loadRegionReasonUpdateBuckets)
new, err := c.loadRegionByID(bo, regionID.id)
if err != nil {
logutil.Logger(bo.GetCtx()).Error("failed to update buckets",
Expand Down
Loading

0 comments on commit 8b3b01e

Please sign in to comment.