From b14615f401e03b2359d228e4b6a8be571b76190e Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Tue, 7 Nov 2023 17:56:39 -0500 Subject: [PATCH] kvclient: cache the sorted list of replicas Calling OptimizeReplicaOrder is expensive and can show up as a hotspot in many high QPS workloads. This PR caches the sorted list on a per-range basis and updates the cache at most once every 3s. Epic: none Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 96 ++++++++++++----------- pkg/kv/kvclient/rangecache/range_cache.go | 55 +++++++++++++ 2 files changed, 106 insertions(+), 45 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index a177c1b744c8..2666c7c3585f 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2239,6 +2239,8 @@ func (ds *DistSender) sendToReplicas( ctx context.Context, ba *kvpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool, ) (*kvpb.BatchResponse, error) { + desc := routing.Desc() + // If this request can be sent to a follower to perform a consistent follower // read under the closed timestamp, promote its routing policy to NEAREST. // If we don't know the closed timestamp policy, we ought to optimistically @@ -2252,56 +2254,59 @@ func (ds *DistSender) sendToReplicas( ba = ba.ShallowCopy() ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST } - // Filter the replicas to only those that are relevant to the routing policy. + + // The cacheDuration is set to the RPCHeartbeatTimeout since that is generally + // how fast we want to react to changes in the system. This could safely be + // tuned higher if it still shows up in hot path analysis. On a leaseholder or + // replica configuration change the cache is also invalidated. + cacheDuration := base.DefaultRPCHeartbeatTimeout + // Generate or load the cached descriptors based on whether we are routing to + // NEAREST or LEASEHOLDER. // NB: When changing leaseholder policy constraint_status_report should be // updated appropriately. - var replicaFilter ReplicaSliceFilter - switch ba.RoutingPolicy { - case kvpb.RoutingPolicy_LEASEHOLDER: - replicaFilter = OnlyPotentialLeaseholders - case kvpb.RoutingPolicy_NEAREST: - replicaFilter = AllExtantReplicas - default: - log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) - } - desc := routing.Desc() - leaseholder := routing.Leaseholder() - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter) - if err != nil { - return nil, err - } + replicas, err := routing.SortedReplicas( + ba.RoutingPolicy == kvpb.RoutingPolicy_NEAREST, + cacheDuration, + func() (roachpb.ReplicaSet, roachpb.ReplicaSet, error) { + leaseholder := routing.Leaseholder() + leaseholderSet, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, OnlyPotentialLeaseholders) + if err != nil { + return roachpb.ReplicaSet{}, roachpb.ReplicaSet{}, err + } - // Rearrange the replicas so that they're ordered according to the routing - // policy. - var leaseholderFirst bool - switch ba.RoutingPolicy { - case kvpb.RoutingPolicy_LEASEHOLDER: - // First order by latency, then move the leaseholder to the front of the - // list, if it is known. - if !ds.dontReorderReplicas { - replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) - } + // Rearrange the replicas so that they're ordered according to the routing + // policy. + // First order by latency, then move the leaseholder to the front of the + // list, if it is known. + if !ds.dontReorderReplicas { + leaseholderSet.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality) + } - idx := -1 - if leaseholder != nil { - idx = replicas.Find(leaseholder.ReplicaID) - } - if idx != -1 { - replicas.MoveToFront(idx) - leaseholderFirst = true - } else { - // The leaseholder node's info must have been missing from gossip when we - // created replicas. - log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not known") - } + idx := -1 + if leaseholder != nil { + idx = leaseholderSet.Find(leaseholder.ReplicaID) + } + if idx != -1 { + leaseholderSet.MoveToFront(idx) + } else { + // The leaseholder node's info must have been missing from gossip when we + // created replicas. + log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not known") + } - case kvpb.RoutingPolicy_NEAREST: - // Order by latency. - log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required") - replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) + // Order by latency. + followerSet, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, AllExtantReplicas) + if err != nil { + return roachpb.ReplicaSet{}, roachpb.ReplicaSet{}, err + } + log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required") + followerSet.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality) - default: - log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) + // Convert to ReplicaSet, we no longer need any of the sorting information. + return leaseholderSet.AsReplicaSet(), followerSet.AsReplicaSet(), nil + }) + if err != nil { + return nil, err } opts := SendOptions{ @@ -2309,7 +2314,7 @@ func (ds *DistSender) sendToReplicas( metrics: &ds.metrics, dontConsiderConnHealth: ds.dontConsiderConnHealth, } - transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas.AsReplicaSet()) + transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) if err != nil { return nil, err } @@ -2621,6 +2626,7 @@ func (ds *DistSender) sendToReplicas( // have a sufficient closed timestamp. In response, we should // immediately redirect to the leaseholder, without a backoff // period. + leaseholderFirst := routing.Leaseholder() != nil && replicas.First().ReplicaID == routing.Leaseholder().ReplicaID intentionallySentToFollower := first && !leaseholderFirst // See if we want to backoff a little before the next attempt. If // the lease info we got is stale and we were intending to send to diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 63fd541ff0dc..e18bcfbe1e42 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -260,6 +260,18 @@ func (rc *RangeCache) stringLocked() string { return buf.String() } +// sortedReplicaSets is a cached view of sorted descriptors for this +// EvictionToken. The computation of the best leaseholder for a range can be +// expensive, so we lazy compute and cache it. There are two ReplicaSets stored +// for each EvictionToken, once with a leaseholder first order, and the other +// using "follower read" sorting. +type sortedReplicaSets struct { + mu syncutil.RWMutex + expiration time.Time + leaseholder roachpb.ReplicaSet + follower roachpb.ReplicaSet +} + // EvictionToken holds eviction state between calls to Lookup. type EvictionToken struct { // rdc is the cache that produced this token - and that will be modified by @@ -274,6 +286,10 @@ type EvictionToken struct { desc *roachpb.RangeDescriptor lease *roachpb.Lease closedts roachpb.RangeClosedTimestampPolicy + // sorted is an optimization to store the list of sorted RepicaSets for both + // leaseholder and follower reads. The sorting of this list shows up in hot + // path analysis. + sorted *sortedReplicaSets // speculativeDesc, if not nil, is the descriptor that should replace desc if // desc proves to be stale - i.e. speculativeDesc is inserted in the cache @@ -295,6 +311,41 @@ type EvictionToken struct { speculativeDesc *roachpb.RangeDescriptor } +// SortedReplicas returns a list of sorted replicas for this token. +func (et *EvictionToken) SortedReplicas( + follower bool, + validFor time.Duration, + compute func() (roachpb.ReplicaSet, roachpb.ReplicaSet, error), +) (roachpb.ReplicaSet, error) { + now := timeutil.Now() + + et.sorted.mu.Lock() + defer et.sorted.mu.Unlock() + if now.After(et.sorted.expiration) { + // refresh the sorted lists + leaseholder, nearest, err := compute() + if err != nil { + return roachpb.ReplicaSet{}, err + } + et.sorted.leaseholder, et.sorted.follower = leaseholder, nearest + et.sorted.expiration = now.Add(validFor) + } + if follower { + return et.sorted.follower, nil + } else { + return et.sorted.leaseholder, nil + } +} + +// clearSortedReplicas is called any time the EvictionToken changes since the +// previous cached list can no longer be used. By bumping the expiration time to +// now, the next request will recreate the cached lists. +func (et *EvictionToken) clearSortedReplicas() { + et.sorted.mu.Lock() + defer et.sorted.mu.Unlock() + et.sorted.expiration = timeutil.Now() +} + func (rc *RangeCache) makeEvictionToken( entry *CacheEntry, speculativeDesc *roachpb.RangeDescriptor, ) EvictionToken { @@ -314,6 +365,7 @@ func (rc *RangeCache) makeEvictionToken( lease: entry.leaseEvenIfSpeculative(), closedts: entry.closedts, speculativeDesc: speculativeDesc, + sorted: &sortedReplicaSets{}, } } @@ -415,6 +467,7 @@ func (et *EvictionToken) syncRLocked( } et.desc = cachedEntry.Desc() et.lease = cachedEntry.leaseEvenIfSpeculative() + et.clearSortedReplicas() return true, cachedEntry, rawEntry } @@ -490,6 +543,7 @@ func (et *EvictionToken) SyncTokenAndMaybeUpdateCache( // range descriptor/lease information available in the RangeCache. et.desc = newEntry.Desc() et.lease = newEntry.leaseEvenIfSpeculative() + et.clearSortedReplicas() return updatedLeaseholder } @@ -550,6 +604,7 @@ func (et *EvictionToken) EvictLease(ctx context.Context) { et.desc = newEntry.Desc() et.lease = newEntry.leaseEvenIfSpeculative() et.rdc.swapEntryLocked(ctx, rawEntry, newEntry) + et.clearSortedReplicas() } func descsCompatible(a, b *roachpb.RangeDescriptor) bool {