Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: cache the sorted list of replicas #114033

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
24 changes: 12 additions & 12 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,12 +817,12 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n4Cache := tc.Server(3).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := n4Cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.False(t, entry.Lease.Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease.Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
}, entry.Desc().Replicas().Descriptors())
}, entry.Desc.Replicas().Descriptors())

// Remove the follower and add a new non-voter to n3. n2 will no longer have a
// replica.
Expand All @@ -840,12 +840,12 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
// Check that the cache was properly updated.
entry = n4Cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.False(t, entry.Lease.Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease.Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 3, StoreID: 3, ReplicaID: 3, Type: roachpb.NON_VOTER},
}, entry.Desc().Replicas().Descriptors())
}, entry.Desc.Replicas().Descriptors())

// Make a note of the follower reads metric on n3. We'll check that it was
// incremented.
Expand Down Expand Up @@ -883,12 +883,12 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n3Cache := tc.Server(2).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry = n3Cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.False(t, entry.Lease.Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease.Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 3, StoreID: 3, ReplicaID: 3, Type: roachpb.NON_VOTER},
}, entry.Desc().Replicas().Descriptors())
}, entry.Desc.Replicas().Descriptors())

// Enable DistSQL so that we have a distributed plan with a single flow on
// n3 (local plans ignore the misplanned ranges).
Expand Down Expand Up @@ -1136,13 +1136,13 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
cache := tenants[gatewayNode].DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := cache.GetCached(ctx, tablePrefix, false /* inverted */)
require.NotNil(t, entry)
require.False(t, entry.Lease().Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID)
require.False(t, entry.Lease.Empty())
require.Equal(t, roachpb.StoreID(1), entry.Lease.Replica.StoreID)
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, entry.Desc().Replicas().Descriptors())
}, entry.Desc.Replicas().Descriptors())

tenantSQL.Exec(t, historicalQuery)
rec := <-recCh
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/multiregionccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '5m'
cache := ds.tc.Server(idx).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID))
entry := cache.GetCached(ctx, tablePrefix, false /* inverted */)
if entry == nil {
if !entry.Desc.IsInitialized() {
return errors.Newf("no entry found for %s in cache", tbName).Error()
}
return entry.ClosedTimestampPolicy().String()
return entry.ClosedTimestampPolicy.String()

case "wait-for-zone-config-changes":
lookupKey, err := getRangeKeyForInput(t, d, ds.tc)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/multiregionccl/roundtrips_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,14 @@ func TestEnsureLocalReadsOnGlobalTables(t *testing.T) {
// Check that the cache was indeed populated.
cache := tc.Server(i).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := cache.GetCached(context.Background(), tablePrefix, false /* inverted */)
require.NotNil(t, entry.Lease().Empty())
require.NotNil(t, entry.Lease.Empty())
require.NotNil(t, entry)

if expected, got := roachpb.LEAD_FOR_GLOBAL_READS, entry.ClosedTimestampPolicy(); got != expected {
if expected, got := roachpb.LEAD_FOR_GLOBAL_READS, entry.ClosedTimestampPolicy; got != expected {
return errors.Newf("expected closedts policy %s, got %s", expected, got)
}

isLeaseHolder = entry.Lease().Replica.NodeID == tc.Server(i).NodeID()
isLeaseHolder = entry.Lease.Replica.NodeID == tc.Server(i).NodeID()
return nil
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
if r, err := b.rc.Lookup(ctx, k); err != nil {
log.Warningf(ctx, "failed to lookup range cache entry for key %v: %v", k, err)
} else {
k := r.Desc().EndKey.AsRawKey()
k := r.Desc.EndKey.AsRawKey()
b.flushKey = k
log.VEventf(ctx, 3, "%s building sstable that will flush before %v", b.name, k)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
for _, k := range []int{0, split1} {
ent, err := ds.RangeDescriptorCache().Lookup(ctx, keys.MustAddr(key(k)))
require.NoError(t, err)
mockCache.Insert(ctx, roachpb.RangeInfo{Desc: *ent.Desc()})
mockCache.Insert(ctx, ent)
}

t.Logf("splitting at %s", key(split2))
Expand Down
104 changes: 53 additions & 51 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,13 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
return
}
log.VEventf(ctx, 1, "gossiped first range descriptor: %+v", desc.Replicas())
ds.rangeCache.EvictByKey(ctx, roachpb.RKeyMin)
et, err := ds.rangeCache.LookupWithEvictionToken(ctx, roachpb.RKeyMin, rangecache.EvictionToken{}, false)
if err != nil {
et.EvictAndReplace(ctx, roachpb.RangeInfo{
Desc: *desc,
ClosedTimestampPolicy: rangecache.UnknownClosedTimestampPolicy,
})
}
})
}

Expand Down Expand Up @@ -2192,6 +2198,46 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
// value when populating the batch header.
const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS

func (ds *DistSender) computeSortedReplicas(
ctx context.Context, desc *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor,
) (roachpb.ReplicaSet, roachpb.ReplicaSet, error) {
leaseholderSet, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, OnlyPotentialLeaseholders)
if err != nil {
return roachpb.ReplicaSet{}, roachpb.ReplicaSet{}, err
}

// Rearrange the replicas so that they're ordered according to the routing
// policy.
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
leaseholderSet.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality)
}

idx := -1
if leaseholder != nil {
idx = leaseholderSet.Find(leaseholder.ReplicaID)
}
if idx != -1 {
leaseholderSet.MoveToFront(idx)
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not known")
}

// Order by latency.
followerSet, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, AllExtantReplicas)
if err != nil {
return roachpb.ReplicaSet{}, roachpb.ReplicaSet{}, err
}
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
followerSet.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality)

// Convert to ReplicaSet, we no longer need any of the sorting information.
return leaseholderSet.AsReplicaSet(), followerSet.AsReplicaSet(), nil
}

// sendToReplicas sends a batch to the replicas of a range. Replicas are tried one
// at a time (generally the leaseholder first). The result of this call is
// either a BatchResponse or an error. In the former case, the BatchResponse
Expand Down Expand Up @@ -2235,61 +2281,17 @@ func (ds *DistSender) sendToReplicas(
ba = ba.ShallowCopy()
ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST
}
// Filter the replicas to only those that are relevant to the routing policy.
// Generate or load the cached descriptors based on whether we are routing to
// NEAREST or LEASEHOLDER.
// NB: When changing leaseholder policy constraint_status_report should be
// updated appropriately.
var replicaFilter ReplicaSliceFilter
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
replicaFilter = OnlyPotentialLeaseholders
case kvpb.RoutingPolicy_NEAREST:
replicaFilter = AllExtantReplicas
default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
var leaseholderFirst = ba.RoutingPolicy == kvpb.RoutingPolicy_LEASEHOLDER
replicas, stillValid := routing.SortedReplicas(ctx, leaseholderFirst, ds.computeSortedReplicas)
if !stillValid {
return nil, newSendError(errors.New("Unable to compute the sorted replica list"))
}
desc := routing.Desc()
leaseholder := routing.Leaseholder()
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter)
if err != nil {
return nil, err
}

// Rearrange the replicas so that they're ordered according to the routing
// policy.
var leaseholderFirst bool
switch ba.RoutingPolicy {
case kvpb.RoutingPolicy_LEASEHOLDER:
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality)
}

idx := -1
if leaseholder != nil {
idx = replicas.Find(leaseholder.ReplicaID)
}
if idx != -1 {
replicas.MoveToFront(idx)
leaseholderFirst = true
} else {
// The leaseholder node's info must have been missing from gossip when we
// created replicas.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not known")
}

case kvpb.RoutingPolicy_NEAREST:
// Order by latency.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality)

default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
}

// NB: upgrade the connection class to SYSTEM, for critical ranges. Set it to
// DEFAULT if the class is unknown, to handle mixed-version states gracefully.
// Other kinds of overrides are possible, see rpc.ConnectionClassForKey().
opts := SendOptions{
class: rpc.ConnectionClassForKey(desc.RSpan().Key, ba.ConnectionClass),
metrics: &ds.metrics,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
}
getInterceptingTransportFactory := func(nID roachpb.NodeID) func(kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) {
transport, tErr := factory(options, slice)
interceptor := &interceptingTransport{
Transport: transport,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func newTransportForRange(
}
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality)
opts := SendOptions{class: defRangefeedConnClass}
return ds.transportFactory(opts, replicas)
return ds.transportFactory(opts, replicas.AsReplicaSet())
}

// makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
Stopper: stopper,
TransportFactory: func(options SendOptions, slice ReplicaSlice) (Transport, error) {
TransportFactory: func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) {
return transport, nil
},
RangeDescriptorDB: rangeDB,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func makeTransportFactory(
rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn,
) func(kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) {
transport, err := factory(options, slice)
if err != nil {
return nil, err
Expand Down
Loading