diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e6ba512af22c..b66c28214808 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2301,7 +2301,7 @@ func (ds *DistSender) sendToReplicas( metrics: &ds.metrics, dontConsiderConnHealth: ds.dontConsiderConnHealth, } - transport, err := ds.transportFactory(opts, replicas) + transport, err := ds.transportFactory(opts, replicas.AsReplicaSet()) if err != nil { return nil, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 017aa011b695..771348c949c7 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -297,7 +297,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { } getInterceptingTransportFactory := func(nID roachpb.NodeID) func(kvcoord.TransportFactory) kvcoord.TransportFactory { return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, tErr := factory(options, slice) interceptor := &interceptingTransport{ Transport: transport, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ef1946ea4e86..06c352dfe9cc 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -756,7 +756,7 @@ func newTransportForRange( } replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) opts := SendOptions{class: defRangefeedConnClass} - return ds.transportFactory(opts, replicas) + return ds.transportFactory(opts, replicas.AsReplicaSet()) } // makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index 1d0688206e86..cb8be6a2e168 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -146,7 +146,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { NodeDescs: g, RPCRetryOptions: &retry.Options{MaxRetries: 10}, Stopper: stopper, - TransportFactory: func(options SendOptions, slice ReplicaSlice) (Transport, error) { + TransportFactory: func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { return transport, nil }, RangeDescriptorDB: rangeDB, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index bc2eb068df5e..2075e3b0cd23 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -169,7 +169,7 @@ func makeTransportFactory( rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn, ) func(kvcoord.TransportFactory) kvcoord.TransportFactory { return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, err := factory(options, slice) if err != nil { return nil, err diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index f442071952c9..f09df4db1f72 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -145,7 +145,7 @@ var stubRPCSendFn simpleSendFn = func( func adaptSimpleTransport(fn simpleSendFn) TransportFactory { return func( _ SendOptions, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &simpleTransportAdapter{ fn: fn, @@ -419,7 +419,7 @@ func TestSendRPCOrder(t *testing.T) { var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor) error var transportFactory TransportFactory = func( - opts SendOptions, replicas ReplicaSlice, + opts SendOptions, replicas roachpb.ReplicaSet, ) (Transport, error) { reps := replicas.Descriptors() if err := verifyCall(opts, reps); err != nil { @@ -3391,7 +3391,7 @@ func TestSenderTransport(t *testing.T) { ) (r *kvpb.BatchResponse, e *kvpb.Error) { return }, - ))(SendOptions{}, ReplicaSlice{{}}) + ))(SendOptions{}, roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{}})) if err != nil { t.Fatal(err) } @@ -4210,7 +4210,7 @@ func TestConnectionClass(t *testing.T) { // class will capture the connection class used for the last transport // created. var class rpc.ConnectionClass - var transportFactory TransportFactory = func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + var transportFactory TransportFactory = func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { class = opts.class return adaptSimpleTransport( func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { @@ -5287,20 +5287,10 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { } } - makeReplicaInfo := func(replicaID int, region string) ReplicaInfo { - return ReplicaInfo{ - ReplicaDescriptor: roachpb.ReplicaDescriptor{ - ReplicaID: roachpb.ReplicaID(replicaID), - }, - Locality: makeLocality(region), - } - } - for _, tc := range []struct { name string cfg *DistSenderConfig desc *roachpb.RangeDescriptor - replicas ReplicaSlice curReplica *roachpb.ReplicaDescriptor expectedRead tenantcostmodel.NetworkCost expectedWrite tenantcostmodel.NetworkCost @@ -5368,12 +5358,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { {Key: "region", Value: "eu-central1"}, }}, }, - desc: newRangeDescriptor(10), - replicas: []ReplicaInfo{ - makeReplicaInfo(1, "foo"), - makeReplicaInfo(2, "bar"), - makeReplicaInfo(3, ""), // Missing region. - }, + desc: newRangeDescriptor(10), curReplica: &roachpb.ReplicaDescriptor{ReplicaID: 3}, expectedRead: 0, expectedWrite: 0, @@ -5479,7 +5464,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { tc.cfg.Stopper = stopper tc.cfg.RangeDescriptorDB = rddb tc.cfg.Settings = st - tc.cfg.TransportFactory = func(SendOptions, ReplicaSlice) (Transport, error) { + tc.cfg.TransportFactory = func(SendOptions, roachpb.ReplicaSet) (Transport, error) { assert.Fail(t, "test should not try and use the transport factory") return nil, nil } diff --git a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go index bffb408877fc..ef74928c4336 100644 --- a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go +++ b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go @@ -90,7 +90,7 @@ func NewDistSenderForLocalTestCluster( Stopper: stopper, RPCRetryOptions: &retryOpts, FirstRangeProvider: g, - TransportFactory: func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + TransportFactory: func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { transport, err := senderTransportFactory(opts, replicas) if err != nil { return nil, err diff --git a/pkg/kv/kvclient/kvcoord/range_iter_test.go b/pkg/kv/kvclient/kvcoord/range_iter_test.go index 36f716134004..fb602a718f07 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter_test.go +++ b/pkg/kv/kvclient/kvcoord/range_iter_test.go @@ -48,7 +48,7 @@ func init() { alphaRangeDescriptorDB = mockRangeDescriptorDBForDescs( append(alphaRangeDescriptors, TestMetaRangeDescriptor)..., ) - tf = func(options SendOptions, slice ReplicaSlice) (Transport, error) { + tf = func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { panic("transport not set up for use") } } diff --git a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go index 58916c9d13ea..5de9cc688776 100644 --- a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go +++ b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go @@ -52,7 +52,7 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T } args.ServerArgs.Knobs.KVClient = &kvcoord.ClientTestingKnobs{ TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { tf, err := factory(options, slice) if err != nil { return nil, err @@ -74,7 +74,6 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T }, nil } - }, } diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index 4b985c69d03e..846ac7db90c7 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -266,6 +266,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( }) } +func (rs ReplicaSlice) AsReplicaSet() roachpb.ReplicaSet { + return roachpb.MakeReplicaSet(rs.Descriptors()) +} + // Descriptors returns the ReplicaDescriptors inside the ReplicaSlice. func (rs ReplicaSlice) Descriptors() []roachpb.ReplicaDescriptor { reps := make([]roachpb.ReplicaDescriptor, len(rs)) diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 4e7a8164fad2..db83749eeca1 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -153,13 +153,13 @@ func TestSendToOneClient(t *testing.T) { // firstNErrorTransport is a mock transport that sends an error on // requests to the first N addresses, then succeeds. type firstNErrorTransport struct { - replicas ReplicaSlice + replicas roachpb.ReplicaSet numErrors int numSent int } func (f *firstNErrorTransport) IsExhausted() bool { - return f.numSent >= len(f.replicas) + return f.numSent >= len(f.replicas.Descriptors()) } func (f *firstNErrorTransport) Release() {} @@ -182,7 +182,7 @@ func (f *firstNErrorTransport) NextInternalClient( } func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor { - return f.replicas[f.numSent].ReplicaDescriptor + return f.replicas.Descriptors()[f.numSent] } func (f *firstNErrorTransport) SkipReplica() { @@ -241,7 +241,7 @@ func TestComplexScenarios(t *testing.T) { t, func( _ SendOptions, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &firstNErrorTransport{ replicas: replicas, diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 7e9a9206789a..130a813b159c 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -49,7 +49,7 @@ type SendOptions struct { // // The caller is responsible for ordering the replicas in the slice according to // the order in which the should be tried. -type TransportFactory func(SendOptions, ReplicaSlice) (Transport, error) +type TransportFactory func(SendOptions, roachpb.ReplicaSet) (Transport, error) // Transport objects can send RPCs to one or more replicas of a range. // All calls to Transport methods are made from a single thread, so @@ -101,24 +101,24 @@ const ( // During race builds, we wrap this to hold on to and read all obtained // requests in a tight loop, exposing data races; see transport_race.go. func grpcTransportFactoryImpl( - opts SendOptions, nodeDialer *nodedialer.Dialer, rs ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, rs roachpb.ReplicaSet, ) (Transport, error) { transport := grpcTransportPool.Get().(*grpcTransport) // Grab the saved slice memory from grpcTransport. replicas := transport.replicas - if cap(replicas) < len(rs) { - replicas = make([]roachpb.ReplicaDescriptor, len(rs)) + descriptors := rs.Descriptors() + if cap(replicas) < len(descriptors) { + replicas = make([]roachpb.ReplicaDescriptor, len(descriptors)) } else { - replicas = replicas[:len(rs)] + replicas = replicas[:len(descriptors)] } // We'll map the index of the replica descriptor in its slice to its health. var health util.FastIntMap - for i := range rs { - r := &rs[i] - replicas[i] = r.ReplicaDescriptor - healthy := nodeDialer.ConnHealth(r.NodeID, opts.class) == nil + for i, desc := range descriptors { + replicas[i] = desc + healthy := nodeDialer.ConnHealth(desc.NodeID, opts.class) == nil if healthy { health.Set(i, healthHealthy) } else { @@ -317,9 +317,9 @@ func (h *byHealth) Less(i, j int) bool { // Transport. This is useful for tests that want to use DistSender // without a full RPC stack. func SenderTransportFactory(tracer *tracing.Tracer, sender kv.Sender) TransportFactory { - return func(_ SendOptions, replicas ReplicaSlice) (Transport, error) { + return func(_ SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { // Always send to the first replica. - replica := replicas[0].ReplicaDescriptor + replica := replicas.First() return &senderTransport{tracer, sender, replica, false}, nil } } diff --git a/pkg/kv/kvclient/kvcoord/transport_race.go b/pkg/kv/kvclient/kvcoord/transport_race.go index 1033e4cc39bf..ba5b804a037c 100644 --- a/pkg/kv/kvclient/kvcoord/transport_race.go +++ b/pkg/kv/kvclient/kvcoord/transport_race.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -92,7 +93,7 @@ func (tr raceTransport) SendNext( // a) the server doesn't hold on to any memory, and // b) the server doesn't mutate the request func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory { - return func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + return func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { if atomic.AddInt32(&running, 1) <= 1 { if err := nodeDialer.Stopper().RunAsyncTask( context.TODO(), "transport racer", func(ctx context.Context) { diff --git a/pkg/kv/kvclient/kvcoord/transport_regular.go b/pkg/kv/kvclient/kvcoord/transport_regular.go index b20c16280232..82829cfae800 100644 --- a/pkg/kv/kvclient/kvcoord/transport_regular.go +++ b/pkg/kv/kvclient/kvcoord/transport_regular.go @@ -13,11 +13,14 @@ package kvcoord -import "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +) // GRPCTransportFactory is the default TransportFactory, using GRPC. func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory { - return func(options SendOptions, slice ReplicaSlice) (Transport, error) { + return func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { return grpcTransportFactoryImpl(options, nodeDialer, slice) } } diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 9c764568c9e0..aeb322d97d5d 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -47,6 +47,10 @@ func (d ReplicaSet) String() string { return redact.StringWithoutMarkers(d) } +func (d ReplicaSet) First() ReplicaDescriptor { + return d.wrapped[0] +} + // Descriptors returns every replica descriptor in the set, including both voter // replicas and learner replicas. Voter replicas are ordered first in the // returned slice. diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 54249166032a..18b8ae5dbfac 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -93,7 +93,7 @@ func TestAmbiguousCommit(t *testing.T) { params.Knobs.KVClient = &kvcoord.ClientTestingKnobs{ TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, err := factory(options, slice) return &interceptingTransport{ Transport: transport,