Skip to content

Commit

Permalink
kvclient: cache the sorted list of replicas
Browse files Browse the repository at this point in the history
Calling OptimizeReplicaOrder is expensive and can show up as a hotspot
in many high QPS workloads. This PR caches the sorted list on a
per-range basis and updates the cache at most once every 3s.

Epic: none

Release note: None
  • Loading branch information
andrewbaptist committed Nov 9, 2023
1 parent a8910e6 commit 4925b69
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 45 deletions.
96 changes: 51 additions & 45 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,8 @@ func (ds *DistSender) sendToReplicas(
ctx context.Context, ba *kvpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool,
) (*kvpb.BatchResponse, error) {

desc := routing.Desc()

// If this request can be sent to a follower to perform a consistent follower
// read under the closed timestamp, promote its routing policy to NEAREST.
// If we don't know the closed timestamp policy, we ought to optimistically
Expand All @@ -2214,64 +2216,67 @@ func (ds *DistSender) sendToReplicas(
ba = ba.ShallowCopy()
ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST
}
// Filter the replicas to only those that are relevant to the routing policy.

// The cacheDuration is set to the RPCHeartbeatTimeout since that is generally
// how fast we want to react to changes in the system. This could safely be
// tuned higher if it still shows up in hot path analysis. On a leaseholder or
// replica configuration change the cache is also invalidated.
cacheDuration := base.DefaultRPCHeartbeatTimeout
// Generate or load the cached descriptors based on whether we are routing to
// NEAREST or LEASEHOLDER.
// NB: When changing leaseholder policy constraint_status_report should be
// updated appropriately.
var replicaFilter ReplicaSliceFilter
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
replicaFilter = OnlyPotentialLeaseholders
case kvpb.RoutingPolicy_NEAREST:
replicaFilter = AllExtantReplicas
default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
}
desc := routing.Desc()
leaseholder := routing.Leaseholder()
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter)
if err != nil {
return nil, err
}
replicas, err := routing.SortedReplicas(
ba.RoutingPolicy == kvpb.RoutingPolicy_NEAREST,
cacheDuration,
func() (roachpb.ReplicaSet, roachpb.ReplicaSet, error) {
leaseholder := routing.Leaseholder()
leaseholderSet, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, OnlyPotentialLeaseholders)
if err != nil {
return roachpb.ReplicaSet{}, roachpb.ReplicaSet{}, err
}

// Rearrange the replicas so that they're ordered according to the routing
// policy.
var leaseholderFirst bool
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality)
}
// Rearrange the replicas so that they're ordered according to the routing
// policy.
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
leaseholderSet.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality)
}

idx := -1
if leaseholder != nil {
idx = replicas.Find(leaseholder.ReplicaID)
}
if idx != -1 {
replicas.MoveToFront(idx)
leaseholderFirst = true
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not known")
}
idx := -1
if leaseholder != nil {
idx = leaseholderSet.Find(leaseholder.ReplicaID)
}
if idx != -1 {
leaseholderSet.MoveToFront(idx)
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not known")
}

case kvpb.RoutingPolicy_NEAREST:
// Order by latency.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality)
// Order by latency.
followerSet, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, AllExtantReplicas)
if err != nil {
return roachpb.ReplicaSet{}, roachpb.ReplicaSet{}, err
}
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
followerSet.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality)

default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
// Convert to ReplicaSet, we no longer need any of the sorting information.
return leaseholderSet.AsReplicaSet(), followerSet.AsReplicaSet(), nil
})
if err != nil {
return nil, err
}

opts := SendOptions{
class: rpc.ConnectionClassForKey(desc.RSpan().Key),
metrics: &ds.metrics,
dontConsiderConnHealth: ds.dontConsiderConnHealth,
}
transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas.AsReplicaSet())
transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2583,6 +2588,7 @@ func (ds *DistSender) sendToReplicas(
// have a sufficient closed timestamp. In response, we should
// immediately redirect to the leaseholder, without a backoff
// period.
leaseholderFirst := routing.Leaseholder() != nil && replicas.First().ReplicaID == routing.Leaseholder().ReplicaID
intentionallySentToFollower := first && !leaseholderFirst
// See if we want to backoff a little before the next attempt. If
// the lease info we got is stale and we were intending to send to
Expand Down
55 changes: 55 additions & 0 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ func (rc *RangeCache) stringLocked() string {
return buf.String()
}

// sortedReplicaSets is a cached view of sorted descriptors for this
// EvictionToken. The computation of the best leaseholder for a range can be
// expensive, so we lazy compute and cache it. There are two ReplicaSets stored
// for each EvictionToken, once with a leaseholder first order, and the other
// using "follower read" sorting.
type sortedReplicaSets struct {
mu syncutil.RWMutex
expiration time.Time
leaseholder roachpb.ReplicaSet
follower roachpb.ReplicaSet
}

// EvictionToken holds eviction state between calls to Lookup.
type EvictionToken struct {
// rdc is the cache that produced this token - and that will be modified by
Expand All @@ -274,6 +286,10 @@ type EvictionToken struct {
desc *roachpb.RangeDescriptor
lease *roachpb.Lease
closedts roachpb.RangeClosedTimestampPolicy
// sorted is an optimization to store the list of sorted RepicaSets for both
// leaseholder and follower reads. The sorting of this list shows up in hot
// path analysis.
sorted *sortedReplicaSets

// speculativeDesc, if not nil, is the descriptor that should replace desc if
// desc proves to be stale - i.e. speculativeDesc is inserted in the cache
Expand All @@ -295,6 +311,41 @@ type EvictionToken struct {
speculativeDesc *roachpb.RangeDescriptor
}

// SortedReplicas returns a list of sorted replicas for this token.
func (et *EvictionToken) SortedReplicas(
follower bool,
validFor time.Duration,
compute func() (roachpb.ReplicaSet, roachpb.ReplicaSet, error),
) (roachpb.ReplicaSet, error) {
now := timeutil.Now()

et.sorted.mu.Lock()
defer et.sorted.mu.Unlock()
if now.After(et.sorted.expiration) {
// refresh the sorted lists
leaseholder, nearest, err := compute()
if err != nil {
return roachpb.ReplicaSet{}, err
}
et.sorted.leaseholder, et.sorted.follower = leaseholder, nearest
et.sorted.expiration = now.Add(validFor)
}
if follower {
return et.sorted.follower, nil
} else {
return et.sorted.leaseholder, nil
}
}

// clearSortedReplicas is called any time the EvictionToken changes since the
// previous cached list can no longer be used. By bumping the expiration time to
// now, the next request will recreate the cached lists.
func (et *EvictionToken) clearSortedReplicas() {
et.sorted.mu.Lock()
defer et.sorted.mu.Unlock()
et.sorted.expiration = timeutil.Now()
}

func (rc *RangeCache) makeEvictionToken(
entry *CacheEntry, speculativeDesc *roachpb.RangeDescriptor,
) EvictionToken {
Expand All @@ -314,6 +365,7 @@ func (rc *RangeCache) makeEvictionToken(
lease: entry.leaseEvenIfSpeculative(),
closedts: entry.closedts,
speculativeDesc: speculativeDesc,
sorted: &sortedReplicaSets{},
}
}

Expand Down Expand Up @@ -415,6 +467,7 @@ func (et *EvictionToken) syncRLocked(
}
et.desc = cachedEntry.Desc()
et.lease = cachedEntry.leaseEvenIfSpeculative()
et.clearSortedReplicas()
return true, cachedEntry, rawEntry
}

Expand Down Expand Up @@ -490,6 +543,7 @@ func (et *EvictionToken) SyncTokenAndMaybeUpdateCache(
// range descriptor/lease information available in the RangeCache.
et.desc = newEntry.Desc()
et.lease = newEntry.leaseEvenIfSpeculative()
et.clearSortedReplicas()
return updatedLeaseholder
}

Expand Down Expand Up @@ -550,6 +604,7 @@ func (et *EvictionToken) EvictLease(ctx context.Context) {
et.desc = newEntry.Desc()
et.lease = newEntry.leaseEvenIfSpeculative()
et.rdc.swapEntryLocked(ctx, rawEntry, newEntry)
et.clearSortedReplicas()
}

func descsCompatible(a, b *roachpb.RangeDescriptor) bool {
Expand Down

0 comments on commit 4925b69

Please sign in to comment.