Skip to content

Commit

Permalink
kvclient: replace ReplicaSlice with ReplicaSet for routing
Browse files Browse the repository at this point in the history
This PR replaces ReplicaSlice with ReplicaSet outside of the
DistSender. ReplicaSlice is an internal implementation which
contains information only required for sorting the replicas.
Outside of DistSender the additional sorting information is
unnecessary and unused.

Epic: none
Informs: #112351

Release note: None
  • Loading branch information
andrewbaptist committed Dec 18, 2023
1 parent 6fae6a7 commit e476a5c
Show file tree
Hide file tree
Showing 15 changed files with 42 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,7 +2309,7 @@ func (ds *DistSender) sendToReplicas(
metrics: &ds.metrics,
dontConsiderConnHealth: ds.dontConsiderConnHealth,
}
transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas)
transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas.AsReplicaSet())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
},
}
getInterceptingTransportFactory := func(nID roachpb.NodeID) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, dialer *nodedialer.Dialer, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
return func(options kvcoord.SendOptions, dialer *nodedialer.Dialer, slice roachpb.ReplicaSet) (kvcoord.Transport, error) {
transport, tErr := kvcoord.GRPCTransportFactory(options, dialer, slice)
interceptor := &interceptingTransport{
Transport: transport,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func newTransportForRange(
}
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, latencyFn, ds.locality)
opts := SendOptions{class: connectionClass(&ds.st.SV)}
return ds.transportFactory(opts, ds.nodeDialer, replicas)
return ds.transportFactory(opts, ds.nodeDialer, replicas.AsReplicaSet())
}

// makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
RPCRetryOptions: &retry.Options{MaxRetries: 10},
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) {
TransportFactory: func(SendOptions, *nodedialer.Dialer, roachpb.ReplicaSet) (Transport, error) {
return transport, nil
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func makeTransportFactory(
return func(
options kvcoord.SendOptions,
dialer *nodedialer.Dialer,
slice kvcoord.ReplicaSlice,
slice roachpb.ReplicaSet,
) (kvcoord.Transport, error) {
transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func adaptSimpleTransport(fn simpleSendFn) TransportFactory {
return func(
_ SendOptions,
_ *nodedialer.Dialer,
replicas ReplicaSlice,
replicas roachpb.ReplicaSet,
) (Transport, error) {
return &simpleTransportAdapter{
fn: fn,
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestSendRPCOrder(t *testing.T) {
var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor) error

var transportFactory TransportFactory = func(
opts SendOptions, dialer *nodedialer.Dialer, replicas ReplicaSlice,
opts SendOptions, dialer *nodedialer.Dialer, replicas roachpb.ReplicaSet,
) (Transport, error) {
reps := replicas.Descriptors()
if err := verifyCall(opts, reps); err != nil {
Expand Down Expand Up @@ -3483,7 +3483,7 @@ func TestSenderTransport(t *testing.T) {
) (r *kvpb.BatchResponse, e *kvpb.Error) {
return
},
))(SendOptions{}, &nodedialer.Dialer{}, ReplicaSlice{{}})
))(SendOptions{}, &nodedialer.Dialer{}, roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{}}))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -4318,7 +4318,7 @@ func TestConnectionClass(t *testing.T) {
// created.
var class rpc.ConnectionClass
var transportFactory TransportFactory = func(
opts SendOptions, dialer *nodedialer.Dialer, replicas ReplicaSlice,
opts SendOptions, dialer *nodedialer.Dialer, replicas roachpb.ReplicaSet,
) (Transport, error) {
class = opts.class
return adaptSimpleTransport(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/local_test_cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewDistSenderForLocalTestCluster(
TransportFactory: func(
opts SendOptions,
nodeDialer *nodedialer.Dialer,
replicas ReplicaSlice,
replicas roachpb.ReplicaSet,
) (Transport, error) {
transport, err := senderTransportFactory(opts, nodeDialer, replicas)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/replayed_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T
TransportFactory: func(
options kvcoord.SendOptions,
dialer *nodedialer.Dialer,
slice kvcoord.ReplicaSlice,
slice roachpb.ReplicaSet,
) (kvcoord.Transport, error) {
tf, err := kvcoord.GRPCTransportFactory(options, dialer, slice)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
})
}

func (rs ReplicaSlice) AsReplicaSet() roachpb.ReplicaSet {
return roachpb.MakeReplicaSet(rs.Descriptors())
}

// Descriptors returns the ReplicaDescriptors inside the ReplicaSlice.
func (rs ReplicaSlice) Descriptors() []roachpb.ReplicaDescriptor {
reps := make([]roachpb.ReplicaDescriptor, len(rs))
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ func TestSendToOneClient(t *testing.T) {
// firstNErrorTransport is a mock transport that sends an error on
// requests to the first N addresses, then succeeds.
type firstNErrorTransport struct {
replicas ReplicaSlice
replicas roachpb.ReplicaSet
numErrors int
numSent int
}

func (f *firstNErrorTransport) IsExhausted() bool {
return f.numSent >= len(f.replicas)
return f.numSent >= len(f.replicas.Descriptors())
}

func (f *firstNErrorTransport) Release() {}
Expand All @@ -183,7 +183,7 @@ func (f *firstNErrorTransport) NextInternalClient(
}

func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor {
return f.replicas[f.numSent].ReplicaDescriptor
return f.replicas.Descriptors()[f.numSent]
}

func (f *firstNErrorTransport) SkipReplica() {
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestComplexScenarios(t *testing.T) {
func(
_ SendOptions,
_ *nodedialer.Dialer,
replicas ReplicaSlice,
replicas roachpb.ReplicaSet,
) (Transport, error) {
return &firstNErrorTransport{
replicas: replicas,
Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type SendOptions struct {
// The caller is responsible for ordering the replicas in the slice according to
// the order in which the should be tried.
type TransportFactory func(
SendOptions, *nodedialer.Dialer, ReplicaSlice,
SendOptions, *nodedialer.Dialer, roachpb.ReplicaSet,
) (Transport, error)

// Transport objects can send RPCs to one or more replicas of a range.
Expand Down Expand Up @@ -103,24 +103,24 @@ const (
// During race builds, we wrap this to hold on to and read all obtained
// requests in a tight loop, exposing data races; see transport_race.go.
func grpcTransportFactoryImpl(
opts SendOptions, nodeDialer *nodedialer.Dialer, rs ReplicaSlice,
opts SendOptions, nodeDialer *nodedialer.Dialer, rs roachpb.ReplicaSet,
) (Transport, error) {
transport := grpcTransportPool.Get().(*grpcTransport)
// Grab the saved slice memory from grpcTransport.
replicas := transport.replicas

if cap(replicas) < len(rs) {
replicas = make([]roachpb.ReplicaDescriptor, len(rs))
descriptors := rs.Descriptors()
if cap(replicas) < len(descriptors) {
replicas = make([]roachpb.ReplicaDescriptor, len(descriptors))
} else {
replicas = replicas[:len(rs)]
replicas = replicas[:len(descriptors)]
}

// We'll map the index of the replica descriptor in its slice to its health.
var health util.FastIntMap
for i := range rs {
r := &rs[i]
replicas[i] = r.ReplicaDescriptor
healthy := nodeDialer.ConnHealth(r.NodeID, opts.class) == nil
for i, desc := range descriptors {
replicas[i] = desc
healthy := nodeDialer.ConnHealth(desc.NodeID, opts.class) == nil
if healthy {
health.Set(i, healthHealthy)
} else {
Expand Down Expand Up @@ -320,10 +320,10 @@ func (h *byHealth) Less(i, j int) bool {
// without a full RPC stack.
func SenderTransportFactory(tracer *tracing.Tracer, sender kv.Sender) TransportFactory {
return func(
_ SendOptions, _ *nodedialer.Dialer, replicas ReplicaSlice,
_ SendOptions, _ *nodedialer.Dialer, replicas roachpb.ReplicaSet,
) (Transport, error) {
// Always send to the first replica.
replica := replicas[0].ReplicaDescriptor
replica := replicas.First()
return &senderTransport{tracer, sender, replica, false}, nil
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/transport_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (tr raceTransport) SendNext(
// a) the server doesn't hold on to any memory, and
// b) the server doesn't mutate the request
func GRPCTransportFactory(
opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice,
opts SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet,
) (Transport, error) {
if atomic.AddInt32(&running, 1) <= 1 {
if err := nodeDialer.Stopper().RunAsyncTask(
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvclient/kvcoord/transport_regular.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@

package kvcoord

import "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
)

// GRPCTransportFactory is the default TransportFactory, using GRPC.
func GRPCTransportFactory(
opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice,
opts SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet,
) (Transport, error) {
return grpcTransportFactoryImpl(opts, nodeDialer, replicas)
}
4 changes: 4 additions & 0 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (d ReplicaSet) String() string {
return redact.StringWithoutMarkers(d)
}

func (d ReplicaSet) First() ReplicaDescriptor {
return d.wrapped[0]
}

// Descriptors returns every replica descriptor in the set, including both voter
// replicas and learner replicas. Voter replicas are ordered first in the
// returned slice.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/ambiguous_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestAmbiguousCommit(t *testing.T) {

params.Knobs.KVClient = &kvcoord.ClientTestingKnobs{
TransportFactory: func(
opts kvcoord.SendOptions, nodeDialer *nodedialer.Dialer, replicas kvcoord.ReplicaSlice,
opts kvcoord.SendOptions, nodeDialer *nodedialer.Dialer, replicas roachpb.ReplicaSet,
) (kvcoord.Transport, error) {
transport, err := kvcoord.GRPCTransportFactory(opts, nodeDialer, replicas)
return &interceptingTransport{
Expand Down

0 comments on commit e476a5c

Please sign in to comment.