From ad7cc7a36dda15c2f9f30f413f32de31a0d7b1db Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 9 Nov 2023 09:24:18 -0500 Subject: [PATCH] kvclient: replace ReplicaSlice with ReplicaSet for routing This PR replaces ReplicaSlice with ReplicaSet outside of the DistSender. ReplicaSlice is an internal implementation which contains information only required for sorting the replicas. Outside of DistSender the additional sorting information is unnecessary and unused. Epic: none Informs: #112351 Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 2 +- .../kvcoord/dist_sender_ambiguous_test.go | 2 +- .../kvclient/kvcoord/dist_sender_rangefeed.go | 2 +- .../dist_sender_rangefeed_mock_test.go | 2 +- .../kvcoord/dist_sender_rangefeed_test.go | 2 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 10 ++++----- .../kvcoord/local_test_cluster_util.go | 2 +- pkg/kv/kvclient/kvcoord/range_iter_test.go | 2 +- .../kvclient/kvcoord/replayed_commit_test.go | 3 +-- pkg/kv/kvclient/kvcoord/replica_slice.go | 4 ++++ pkg/kv/kvclient/kvcoord/send_test.go | 8 +++---- pkg/kv/kvclient/kvcoord/transport.go | 22 +++++++++---------- pkg/kv/kvclient/kvcoord/transport_race.go | 3 ++- pkg/kv/kvclient/kvcoord/transport_regular.go | 7 ++++-- pkg/roachpb/metadata_replicas.go | 4 ++++ pkg/sql/ambiguous_commit_test.go | 2 +- 16 files changed, 44 insertions(+), 33 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index cb942e31d897..776989ff03af 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2299,7 +2299,7 @@ func (ds *DistSender) sendToReplicas( metrics: &ds.metrics, dontConsiderConnHealth: ds.dontConsiderConnHealth, } - transport, err := ds.transportFactory(opts, replicas) + transport, err := ds.transportFactory(opts, replicas.AsReplicaSet()) if err != nil { return nil, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index aefbef501607..13d1544bfb12 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -295,7 +295,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { } getInterceptingTransportFactory := func(nID roachpb.NodeID) func(kvcoord.TransportFactory) kvcoord.TransportFactory { return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, tErr := factory(options, slice) interceptor := &interceptingTransport{ Transport: transport, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ef1946ea4e86..06c352dfe9cc 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -756,7 +756,7 @@ func newTransportForRange( } replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) opts := SendOptions{class: defRangefeedConnClass} - return ds.transportFactory(opts, replicas) + return ds.transportFactory(opts, replicas.AsReplicaSet()) } // makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index 1d0688206e86..cb8be6a2e168 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -146,7 +146,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { NodeDescs: g, RPCRetryOptions: &retry.Options{MaxRetries: 10}, Stopper: stopper, - TransportFactory: func(options SendOptions, slice ReplicaSlice) (Transport, error) { + TransportFactory: func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { return transport, nil }, RangeDescriptorDB: rangeDB, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 149e6614e0db..68448c55bc44 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -165,7 +165,7 @@ func makeTransportFactory( rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn, ) func(kvcoord.TransportFactory) kvcoord.TransportFactory { return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, err := factory(options, slice) if err != nil { return nil, err diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index da0a67fff870..9d37e72af885 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -143,7 +143,7 @@ var stubRPCSendFn simpleSendFn = func( func adaptSimpleTransport(fn simpleSendFn) TransportFactory { return func( _ SendOptions, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &simpleTransportAdapter{ fn: fn, @@ -401,7 +401,7 @@ func TestSendRPCOrder(t *testing.T) { var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor) error var transportFactory TransportFactory = func( - opts SendOptions, replicas ReplicaSlice, + opts SendOptions, replicas roachpb.ReplicaSet, ) (Transport, error) { reps := replicas.Descriptors() if err := verifyCall(opts, reps); err != nil { @@ -3366,7 +3366,7 @@ func TestSenderTransport(t *testing.T) { ) (r *kvpb.BatchResponse, e *kvpb.Error) { return }, - ))(SendOptions{}, ReplicaSlice{{}}) + ))(SendOptions{}, roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{}})) if err != nil { t.Fatal(err) } @@ -4185,7 +4185,7 @@ func TestConnectionClass(t *testing.T) { // class will capture the connection class used for the last transport // created. var class rpc.ConnectionClass - var transportFactory TransportFactory = func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + var transportFactory TransportFactory = func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { class = opts.class return adaptSimpleTransport( func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { @@ -5455,7 +5455,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { tc.cfg.Stopper = stopper tc.cfg.RangeDescriptorDB = rddb tc.cfg.Settings = st - tc.cfg.TransportFactory = func(SendOptions, ReplicaSlice) (Transport, error) { + tc.cfg.TransportFactory = func(SendOptions, roachpb.ReplicaSet) (Transport, error) { assert.Fail(t, "test should not try and use the transport factory") return nil, nil } diff --git a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go index bffb408877fc..ef74928c4336 100644 --- a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go +++ b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go @@ -90,7 +90,7 @@ func NewDistSenderForLocalTestCluster( Stopper: stopper, RPCRetryOptions: &retryOpts, FirstRangeProvider: g, - TransportFactory: func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + TransportFactory: func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { transport, err := senderTransportFactory(opts, replicas) if err != nil { return nil, err diff --git a/pkg/kv/kvclient/kvcoord/range_iter_test.go b/pkg/kv/kvclient/kvcoord/range_iter_test.go index 36f716134004..fb602a718f07 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter_test.go +++ b/pkg/kv/kvclient/kvcoord/range_iter_test.go @@ -48,7 +48,7 @@ func init() { alphaRangeDescriptorDB = mockRangeDescriptorDBForDescs( append(alphaRangeDescriptors, TestMetaRangeDescriptor)..., ) - tf = func(options SendOptions, slice ReplicaSlice) (Transport, error) { + tf = func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { panic("transport not set up for use") } } diff --git a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go index 58916c9d13ea..5de9cc688776 100644 --- a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go +++ b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go @@ -52,7 +52,7 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T } args.ServerArgs.Knobs.KVClient = &kvcoord.ClientTestingKnobs{ TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { tf, err := factory(options, slice) if err != nil { return nil, err @@ -74,7 +74,6 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T }, nil } - }, } diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index 4b985c69d03e..846ac7db90c7 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -266,6 +266,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( }) } +func (rs ReplicaSlice) AsReplicaSet() roachpb.ReplicaSet { + return roachpb.MakeReplicaSet(rs.Descriptors()) +} + // Descriptors returns the ReplicaDescriptors inside the ReplicaSlice. func (rs ReplicaSlice) Descriptors() []roachpb.ReplicaDescriptor { reps := make([]roachpb.ReplicaDescriptor, len(rs)) diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index d8ec5754a9fd..37db6ee59ece 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -153,13 +153,13 @@ func TestSendToOneClient(t *testing.T) { // firstNErrorTransport is a mock transport that sends an error on // requests to the first N addresses, then succeeds. type firstNErrorTransport struct { - replicas ReplicaSlice + replicas roachpb.ReplicaSet numErrors int numSent int } func (f *firstNErrorTransport) IsExhausted() bool { - return f.numSent >= len(f.replicas) + return f.numSent >= len(f.replicas.Descriptors()) } func (f *firstNErrorTransport) Release() {} @@ -182,7 +182,7 @@ func (f *firstNErrorTransport) NextInternalClient( } func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor { - return f.replicas[f.numSent].ReplicaDescriptor + return f.replicas.Descriptors()[f.numSent] } func (f *firstNErrorTransport) SkipReplica() { @@ -237,7 +237,7 @@ func TestComplexScenarios(t *testing.T) { t, func( _ SendOptions, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &firstNErrorTransport{ replicas: replicas, diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 3ff054e0ef2d..4432f76e2918 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -49,7 +49,7 @@ type SendOptions struct { // // The caller is responsible for ordering the replicas in the slice according to // the order in which the should be tried. -type TransportFactory func(SendOptions, ReplicaSlice) (Transport, error) +type TransportFactory func(SendOptions, roachpb.ReplicaSet) (Transport, error) // Transport objects can send RPCs to one or more replicas of a range. // All calls to Transport methods are made from a single thread, so @@ -94,24 +94,24 @@ const ( // During race builds, we wrap this to hold on to and read all obtained // requests in a tight loop, exposing data races; see transport_race.go. func grpcTransportFactoryImpl( - opts SendOptions, nodeDialer *nodedialer.Dialer, rs ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, rs roachpb.ReplicaSet, ) (Transport, error) { transport := grpcTransportPool.Get().(*grpcTransport) // Grab the saved slice memory from grpcTransport. replicas := transport.replicas - if cap(replicas) < len(rs) { - replicas = make([]roachpb.ReplicaDescriptor, len(rs)) + descriptors := rs.Descriptors() + if cap(replicas) < len(descriptors) { + replicas = make([]roachpb.ReplicaDescriptor, len(descriptors)) } else { - replicas = replicas[:len(rs)] + replicas = replicas[:len(descriptors)] } // We'll map the index of the replica descriptor in its slice to its health. var health util.FastIntMap - for i := range rs { - r := &rs[i] - replicas[i] = r.ReplicaDescriptor - healthy := nodeDialer.ConnHealth(r.NodeID, opts.class) == nil + for i, desc := range descriptors { + replicas[i] = desc + healthy := nodeDialer.ConnHealth(desc.NodeID, opts.class) == nil if healthy { health.Set(i, healthHealthy) } else { @@ -294,9 +294,9 @@ func (h *byHealth) Less(i, j int) bool { // Transport. This is useful for tests that want to use DistSender // without a full RPC stack. func SenderTransportFactory(tracer *tracing.Tracer, sender kv.Sender) TransportFactory { - return func(_ SendOptions, replicas ReplicaSlice) (Transport, error) { + return func(_ SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { // Always send to the first replica. - replica := replicas[0].ReplicaDescriptor + replica := replicas.First() return &senderTransport{tracer, sender, replica, false}, nil } } diff --git a/pkg/kv/kvclient/kvcoord/transport_race.go b/pkg/kv/kvclient/kvcoord/transport_race.go index 1033e4cc39bf..ba5b804a037c 100644 --- a/pkg/kv/kvclient/kvcoord/transport_race.go +++ b/pkg/kv/kvclient/kvcoord/transport_race.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -92,7 +93,7 @@ func (tr raceTransport) SendNext( // a) the server doesn't hold on to any memory, and // b) the server doesn't mutate the request func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory { - return func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + return func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { if atomic.AddInt32(&running, 1) <= 1 { if err := nodeDialer.Stopper().RunAsyncTask( context.TODO(), "transport racer", func(ctx context.Context) { diff --git a/pkg/kv/kvclient/kvcoord/transport_regular.go b/pkg/kv/kvclient/kvcoord/transport_regular.go index b20c16280232..82829cfae800 100644 --- a/pkg/kv/kvclient/kvcoord/transport_regular.go +++ b/pkg/kv/kvclient/kvcoord/transport_regular.go @@ -13,11 +13,14 @@ package kvcoord -import "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +) // GRPCTransportFactory is the default TransportFactory, using GRPC. func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory { - return func(options SendOptions, slice ReplicaSlice) (Transport, error) { + return func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { return grpcTransportFactoryImpl(options, nodeDialer, slice) } } diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 9c764568c9e0..aeb322d97d5d 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -47,6 +47,10 @@ func (d ReplicaSet) String() string { return redact.StringWithoutMarkers(d) } +func (d ReplicaSet) First() ReplicaDescriptor { + return d.wrapped[0] +} + // Descriptors returns every replica descriptor in the set, including both voter // replicas and learner replicas. Voter replicas are ordered first in the // returned slice. diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 54249166032a..18b8ae5dbfac 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -93,7 +93,7 @@ func TestAmbiguousCommit(t *testing.T) { params.Knobs.KVClient = &kvcoord.ClientTestingKnobs{ TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, err := factory(options, slice) return &interceptingTransport{ Transport: transport,