diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 7024122177c5..a177c1b744c8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2309,7 +2309,7 @@ func (ds *DistSender) sendToReplicas( metrics: &ds.metrics, dontConsiderConnHealth: ds.dontConsiderConnHealth, } - transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) + transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas.AsReplicaSet()) if err != nil { return nil, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 34c19b9924b0..df2946095cd5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -297,7 +297,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { }, } getInterceptingTransportFactory := func(nID roachpb.NodeID) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, dialer *nodedialer.Dialer, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, dialer *nodedialer.Dialer, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, tErr := kvcoord.GRPCTransportFactory(options, dialer, slice) interceptor := &interceptingTransport{ Transport: transport, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index e89be712c8c7..513f6cdf3fae 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -766,7 +766,7 @@ func newTransportForRange( } replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, latencyFn, ds.locality) opts := SendOptions{class: connectionClass(&ds.st.SV)} - return ds.transportFactory(opts, ds.nodeDialer, replicas) + return ds.transportFactory(opts, ds.nodeDialer, replicas.AsReplicaSet()) } // makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index f967a97521a3..1d989a99416d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -157,7 +157,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { RPCRetryOptions: &retry.Options{MaxRetries: 10}, RPCContext: rpcContext, TestingKnobs: ClientTestingKnobs{ - TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { + TransportFactory: func(SendOptions, *nodedialer.Dialer, roachpb.ReplicaSet) (Transport, error) { return transport, nil }, }, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 392ee42b260c..3b89baa2311d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -172,7 +172,7 @@ func makeTransportFactory( return func( options kvcoord.SendOptions, dialer *nodedialer.Dialer, - slice kvcoord.ReplicaSlice, + slice roachpb.ReplicaSet, ) (kvcoord.Transport, error) { transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice) if err != nil { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 4dd2d15b6368..9d29f34c9236 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -144,7 +144,7 @@ func adaptSimpleTransport(fn simpleSendFn) TransportFactory { return func( _ SendOptions, _ *nodedialer.Dialer, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &simpleTransportAdapter{ fn: fn, @@ -418,7 +418,7 @@ func TestSendRPCOrder(t *testing.T) { var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor) error var transportFactory TransportFactory = func( - opts SendOptions, dialer *nodedialer.Dialer, replicas ReplicaSlice, + opts SendOptions, dialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { reps := replicas.Descriptors() if err := verifyCall(opts, reps); err != nil { @@ -3483,7 +3483,7 @@ func TestSenderTransport(t *testing.T) { ) (r *kvpb.BatchResponse, e *kvpb.Error) { return }, - ))(SendOptions{}, &nodedialer.Dialer{}, ReplicaSlice{{}}) + ))(SendOptions{}, &nodedialer.Dialer{}, roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{}})) if err != nil { t.Fatal(err) } @@ -4318,7 +4318,7 @@ func TestConnectionClass(t *testing.T) { // created. var class rpc.ConnectionClass var transportFactory TransportFactory = func( - opts SendOptions, dialer *nodedialer.Dialer, replicas ReplicaSlice, + opts SendOptions, dialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { class = opts.class return adaptSimpleTransport( diff --git a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go index 975747c689ab..4904fdfc2767 100644 --- a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go +++ b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go @@ -99,7 +99,7 @@ func NewDistSenderForLocalTestCluster( TransportFactory: func( opts SendOptions, nodeDialer *nodedialer.Dialer, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { transport, err := senderTransportFactory(opts, nodeDialer, replicas) if err != nil { diff --git a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go index 2276998b1da2..8158ea0b034d 100644 --- a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go +++ b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go @@ -55,7 +55,7 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T TransportFactory: func( options kvcoord.SendOptions, dialer *nodedialer.Dialer, - slice kvcoord.ReplicaSlice, + slice roachpb.ReplicaSet, ) (kvcoord.Transport, error) { tf, err := kvcoord.GRPCTransportFactory(options, dialer, slice) if err != nil { diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index 4b985c69d03e..846ac7db90c7 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -266,6 +266,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( }) } +func (rs ReplicaSlice) AsReplicaSet() roachpb.ReplicaSet { + return roachpb.MakeReplicaSet(rs.Descriptors()) +} + // Descriptors returns the ReplicaDescriptors inside the ReplicaSlice. func (rs ReplicaSlice) Descriptors() []roachpb.ReplicaDescriptor { reps := make([]roachpb.ReplicaDescriptor, len(rs)) diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 48d7849a7d43..d072055756c0 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -154,13 +154,13 @@ func TestSendToOneClient(t *testing.T) { // firstNErrorTransport is a mock transport that sends an error on // requests to the first N addresses, then succeeds. type firstNErrorTransport struct { - replicas ReplicaSlice + replicas roachpb.ReplicaSet numErrors int numSent int } func (f *firstNErrorTransport) IsExhausted() bool { - return f.numSent >= len(f.replicas) + return f.numSent >= len(f.replicas.Descriptors()) } func (f *firstNErrorTransport) Release() {} @@ -183,7 +183,7 @@ func (f *firstNErrorTransport) NextInternalClient( } func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor { - return f.replicas[f.numSent].ReplicaDescriptor + return f.replicas.Descriptors()[f.numSent] } func (f *firstNErrorTransport) SkipReplica() { @@ -244,7 +244,7 @@ func TestComplexScenarios(t *testing.T) { func( _ SendOptions, _ *nodedialer.Dialer, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &firstNErrorTransport{ replicas: replicas, diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index d40d3134174a..4368956a9699 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -50,7 +50,7 @@ type SendOptions struct { // The caller is responsible for ordering the replicas in the slice according to // the order in which the should be tried. type TransportFactory func( - SendOptions, *nodedialer.Dialer, ReplicaSlice, + SendOptions, *nodedialer.Dialer, roachpb.ReplicaSet, ) (Transport, error) // Transport objects can send RPCs to one or more replicas of a range. @@ -103,24 +103,24 @@ const ( // During race builds, we wrap this to hold on to and read all obtained // requests in a tight loop, exposing data races; see transport_race.go. func grpcTransportFactoryImpl( - opts SendOptions, nodeDialer *nodedialer.Dialer, rs ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, rs roachpb.ReplicaSet, ) (Transport, error) { transport := grpcTransportPool.Get().(*grpcTransport) // Grab the saved slice memory from grpcTransport. replicas := transport.replicas - if cap(replicas) < len(rs) { - replicas = make([]roachpb.ReplicaDescriptor, len(rs)) + descriptors := rs.Descriptors() + if cap(replicas) < len(descriptors) { + replicas = make([]roachpb.ReplicaDescriptor, len(descriptors)) } else { - replicas = replicas[:len(rs)] + replicas = replicas[:len(descriptors)] } // We'll map the index of the replica descriptor in its slice to its health. var health util.FastIntMap - for i := range rs { - r := &rs[i] - replicas[i] = r.ReplicaDescriptor - healthy := nodeDialer.ConnHealth(r.NodeID, opts.class) == nil + for i, desc := range descriptors { + replicas[i] = desc + healthy := nodeDialer.ConnHealth(desc.NodeID, opts.class) == nil if healthy { health.Set(i, healthHealthy) } else { @@ -320,10 +320,10 @@ func (h *byHealth) Less(i, j int) bool { // without a full RPC stack. func SenderTransportFactory(tracer *tracing.Tracer, sender kv.Sender) TransportFactory { return func( - _ SendOptions, _ *nodedialer.Dialer, replicas ReplicaSlice, + _ SendOptions, _ *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { // Always send to the first replica. - replica := replicas[0].ReplicaDescriptor + replica := replicas.First() return &senderTransport{tracer, sender, replica, false}, nil } } diff --git a/pkg/kv/kvclient/kvcoord/transport_race.go b/pkg/kv/kvclient/kvcoord/transport_race.go index eac19cef5ac5..ca6fe2548f6f 100644 --- a/pkg/kv/kvclient/kvcoord/transport_race.go +++ b/pkg/kv/kvclient/kvcoord/transport_race.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -92,7 +93,7 @@ func (tr raceTransport) SendNext( // a) the server doesn't hold on to any memory, and // b) the server doesn't mutate the request func GRPCTransportFactory( - opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { if atomic.AddInt32(&running, 1) <= 1 { if err := nodeDialer.Stopper().RunAsyncTask( diff --git a/pkg/kv/kvclient/kvcoord/transport_regular.go b/pkg/kv/kvclient/kvcoord/transport_regular.go index 84e4b35790dd..7e3220254836 100644 --- a/pkg/kv/kvclient/kvcoord/transport_regular.go +++ b/pkg/kv/kvclient/kvcoord/transport_regular.go @@ -13,11 +13,14 @@ package kvcoord -import "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +) // GRPCTransportFactory is the default TransportFactory, using GRPC. func GRPCTransportFactory( - opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (Transport, error) { return grpcTransportFactoryImpl(opts, nodeDialer, replicas) } diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 9c764568c9e0..aeb322d97d5d 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -47,6 +47,10 @@ func (d ReplicaSet) String() string { return redact.StringWithoutMarkers(d) } +func (d ReplicaSet) First() ReplicaDescriptor { + return d.wrapped[0] +} + // Descriptors returns every replica descriptor in the set, including both voter // replicas and learner replicas. Voter replicas are ordered first in the // returned slice. diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 52fd6fd31df7..ea6a914bbfc2 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -94,7 +94,7 @@ func TestAmbiguousCommit(t *testing.T) { params.Knobs.KVClient = &kvcoord.ClientTestingKnobs{ TransportFactory: func( - opts kvcoord.SendOptions, nodeDialer *nodedialer.Dialer, replicas kvcoord.ReplicaSlice, + opts kvcoord.SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet, ) (kvcoord.Transport, error) { transport, err := kvcoord.GRPCTransportFactory(opts, nodeDialer, replicas) return &interceptingTransport{