From e476a5c087ee85865268df2c7d0e49e612fcfaf5 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 9 Nov 2023 09:24:18 -0500 Subject: [PATCH] kvclient: replace ReplicaSlice with ReplicaSet for routing This PR replaces ReplicaSlice with ReplicaSet outside of the DistSender. ReplicaSlice is an internal implementation which contains information only required for sorting the replicas. Outside of DistSender the additional sorting information is unnecessary and unused. Epic: none Informs: #112351 Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 2 +- .../kvcoord/dist_sender_ambiguous_test.go | 2 +- .../kvclient/kvcoord/dist_sender_rangefeed.go | 2 +- .../dist_sender_rangefeed_mock_test.go | 2 +- .../kvcoord/dist_sender_rangefeed_test.go | 2 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 8 +++---- .../kvcoord/local_test_cluster_util.go | 2 +- .../kvclient/kvcoord/replayed_commit_test.go | 2 +- pkg/kv/kvclient/kvcoord/replica_slice.go | 4 ++++ pkg/kv/kvclient/kvcoord/send_test.go | 8 +++---- pkg/kv/kvclient/kvcoord/transport.go | 22 +++++++++---------- pkg/kv/kvclient/kvcoord/transport_race.go | 3 ++- pkg/kv/kvclient/kvcoord/transport_regular.go | 7 ++++-- pkg/roachpb/metadata_replicas.go | 4 ++++ pkg/sql/ambiguous_commit_test.go | 2 +- 15 files changed, 42 insertions(+), 30 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 7024122177c5..a177c1b744c8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2309,7 +2309,7 @@ func (ds *DistSender) sendToReplicas( metrics: &ds.metrics, dontConsiderConnHealth: ds.dontConsiderConnHealth, } - transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) + transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas.AsReplicaSet()) if err != nil { return nil, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 34c19b9924b0..df2946095cd5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -297,7 +297,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { }, } getInterceptingTransportFactory := func(nID roachpb.NodeID) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, dialer *nodedialer.Dialer, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, dialer *nodedialer.Dialer, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, tErr := kvcoord.GRPCTransportFactory(options, dialer, slice) interceptor := &interceptingTransport{ Transport: transport, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index e89be712c8c7..513f6cdf3fae 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -766,7 +766,7 @@ func newTransportForRange( } replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, latencyFn, ds.locality) opts := SendOptions{class: connectionClass(&ds.st.SV)} - return ds.transportFactory(opts, ds.nodeDialer, replicas) + return ds.transportFactory(opts, ds.nodeDialer, replicas.AsReplicaSet()) } // makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index f967a97521a3..1d989a99416d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -157,7 +157,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { RPCRetryOptions: &retry.Options{MaxRetries: 10}, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ - TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { + TransportFactory: func(SendOptions, *nodedialer.Dialer, roachpb.ReplicaSet) (Transport, error) { return transport, nil }, }, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 392ee42b260c..3b89baa2311d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -172,7 +172,7 @@ func makeTransportFactory( return func( options kvcoord.SendOptions, dialer *nodedialer.Dialer, - slice kvcoord.ReplicaSlice, + slice roachpb.ReplicaSet, ) (kvcoord.Transport, error) { transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice) if err != nil { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 4dd2d15b6368..9d29f34c9236 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -144,7 +144,7 @@ func adaptSimpleTransport(fn simpleSendFn) TransportFactory { return func( _ SendOptions, _ *nodedialer.Dialer, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &simpleTransportAdapter{ fn: fn, @@ -418,7 +418,7 @@ func TestSendRPCOrder(t *testing.T) { var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor) error var transportFactory TransportFactory = func( - opts SendOptions, dialer *nodedialer.Dialer, replicas ReplicaSlice, + opts SendOptions, dialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { reps := replicas.Descriptors() if err := verifyCall(opts, reps); err != nil { @@ -3483,7 +3483,7 @@ func TestSenderTransport(t *testing.T) { ) (r *kvpb.BatchResponse, e *kvpb.Error) { return }, - ))(SendOptions{}, &nodedialer.Dialer{}, ReplicaSlice{{}}) + ))(SendOptions{}, &nodedialer.Dialer{}, roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{}})) if err != nil { t.Fatal(err) } @@ -4318,7 +4318,7 @@ func TestConnectionClass(t *testing.T) { // created. var class rpc.ConnectionClass var transportFactory TransportFactory = func( - opts SendOptions, dialer *nodedialer.Dialer, replicas ReplicaSlice, + opts SendOptions, dialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { class = opts.class return adaptSimpleTransport( diff --git a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go index 975747c689ab..4904fdfc2767 100644 --- a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go +++ b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go @@ -99,7 +99,7 @@ func NewDistSenderForLocalTestCluster( TransportFactory: func( opts SendOptions, nodeDialer *nodedialer.Dialer, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { transport, err := senderTransportFactory(opts, nodeDialer, replicas) if err != nil { diff --git a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go index 2276998b1da2..8158ea0b034d 100644 --- a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go +++ b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go @@ -55,7 +55,7 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T TransportFactory: func( options kvcoord.SendOptions, dialer *nodedialer.Dialer, - slice kvcoord.ReplicaSlice, + slice roachpb.ReplicaSet, ) (kvcoord.Transport, error) { tf, err := kvcoord.GRPCTransportFactory(options, dialer, slice) if err != nil { diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index 4b985c69d03e..846ac7db90c7 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -266,6 +266,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( }) } +func (rs ReplicaSlice) AsReplicaSet() roachpb.ReplicaSet { + return roachpb.MakeReplicaSet(rs.Descriptors()) +} + // Descriptors returns the ReplicaDescriptors inside the ReplicaSlice. func (rs ReplicaSlice) Descriptors() []roachpb.ReplicaDescriptor { reps := make([]roachpb.ReplicaDescriptor, len(rs)) diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 48d7849a7d43..d072055756c0 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -154,13 +154,13 @@ func TestSendToOneClient(t *testing.T) { // firstNErrorTransport is a mock transport that sends an error on // requests to the first N addresses, then succeeds. type firstNErrorTransport struct { - replicas ReplicaSlice + replicas roachpb.ReplicaSet numErrors int numSent int } func (f *firstNErrorTransport) IsExhausted() bool { - return f.numSent >= len(f.replicas) + return f.numSent >= len(f.replicas.Descriptors()) } func (f *firstNErrorTransport) Release() {} @@ -183,7 +183,7 @@ func (f *firstNErrorTransport) NextInternalClient( } func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor { - return f.replicas[f.numSent].ReplicaDescriptor + return f.replicas.Descriptors()[f.numSent] } func (f *firstNErrorTransport) SkipReplica() { @@ -244,7 +244,7 @@ func TestComplexScenarios(t *testing.T) { func( _ SendOptions, _ *nodedialer.Dialer, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &firstNErrorTransport{ replicas: replicas, diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index d40d3134174a..4368956a9699 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -50,7 +50,7 @@ type SendOptions struct { // The caller is responsible for ordering the replicas in the slice according to // the order in which the should be tried. type TransportFactory func( - SendOptions, *nodedialer.Dialer, ReplicaSlice, + SendOptions, *nodedialer.Dialer, roachpb.ReplicaSet, ) (Transport, error) // Transport objects can send RPCs to one or more replicas of a range. @@ -103,24 +103,24 @@ const ( // During race builds, we wrap this to hold on to and read all obtained // requests in a tight loop, exposing data races; see transport_race.go. func grpcTransportFactoryImpl( - opts SendOptions, nodeDialer *nodedialer.Dialer, rs ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, rs roachpb.ReplicaSet, ) (Transport, error) { transport := grpcTransportPool.Get().(*grpcTransport) // Grab the saved slice memory from grpcTransport. replicas := transport.replicas - if cap(replicas) < len(rs) { - replicas = make([]roachpb.ReplicaDescriptor, len(rs)) + descriptors := rs.Descriptors() + if cap(replicas) < len(descriptors) { + replicas = make([]roachpb.ReplicaDescriptor, len(descriptors)) } else { - replicas = replicas[:len(rs)] + replicas = replicas[:len(descriptors)] } // We'll map the index of the replica descriptor in its slice to its health. var health util.FastIntMap - for i := range rs { - r := &rs[i] - replicas[i] = r.ReplicaDescriptor - healthy := nodeDialer.ConnHealth(r.NodeID, opts.class) == nil + for i, desc := range descriptors { + replicas[i] = desc + healthy := nodeDialer.ConnHealth(desc.NodeID, opts.class) == nil if healthy { health.Set(i, healthHealthy) } else { @@ -320,10 +320,10 @@ func (h *byHealth) Less(i, j int) bool { // without a full RPC stack. func SenderTransportFactory(tracer *tracing.Tracer, sender kv.Sender) TransportFactory { return func( - _ SendOptions, _ *nodedialer.Dialer, replicas ReplicaSlice, + _ SendOptions, _ *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { // Always send to the first replica. - replica := replicas[0].ReplicaDescriptor + replica := replicas.First() return &senderTransport{tracer, sender, replica, false}, nil } } diff --git a/pkg/kv/kvclient/kvcoord/transport_race.go b/pkg/kv/kvclient/kvcoord/transport_race.go index eac19cef5ac5..ca6fe2548f6f 100644 --- a/pkg/kv/kvclient/kvcoord/transport_race.go +++ b/pkg/kv/kvclient/kvcoord/transport_race.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -92,7 +93,7 @@ func (tr raceTransport) SendNext( // a) the server doesn't hold on to any memory, and // b) the server doesn't mutate the request func GRPCTransportFactory( - opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { if atomic.AddInt32(&running, 1) <= 1 { if err := nodeDialer.Stopper().RunAsyncTask( diff --git a/pkg/kv/kvclient/kvcoord/transport_regular.go b/pkg/kv/kvclient/kvcoord/transport_regular.go index 84e4b35790dd..7e3220254836 100644 --- a/pkg/kv/kvclient/kvcoord/transport_regular.go +++ b/pkg/kv/kvclient/kvcoord/transport_regular.go @@ -13,11 +13,14 @@ package kvcoord -import "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +) // GRPCTransportFactory is the default TransportFactory, using GRPC. func GRPCTransportFactory( - opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { return grpcTransportFactoryImpl(opts, nodeDialer, replicas) } diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 9c764568c9e0..aeb322d97d5d 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -47,6 +47,10 @@ func (d ReplicaSet) String() string { return redact.StringWithoutMarkers(d) } +func (d ReplicaSet) First() ReplicaDescriptor { + return d.wrapped[0] +} + // Descriptors returns every replica descriptor in the set, including both voter // replicas and learner replicas. Voter replicas are ordered first in the // returned slice. diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 52fd6fd31df7..ea6a914bbfc2 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -94,7 +94,7 @@ func TestAmbiguousCommit(t *testing.T) { params.Knobs.KVClient = &kvcoord.ClientTestingKnobs{ TransportFactory: func( - opts kvcoord.SendOptions, nodeDialer *nodedialer.Dialer, replicas kvcoord.ReplicaSlice, + opts kvcoord.SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (kvcoord.Transport, error) { transport, err := kvcoord.GRPCTransportFactory(opts, nodeDialer, replicas) return &interceptingTransport{