Skip to content

Commit

Permalink
kvclient: replace ReplicaSlice with ReplicaSet for routing
Browse files Browse the repository at this point in the history
This PR replaces ReplicaSlice with ReplicaSet outside of the
DistSender. ReplicaSlice is an internal implementation which
contains information only required for sorting the replicas.
Outside of DistSender the additional sorting information is
unnecessary and unused.

Epic: none
Informs: cockroachdb#112351

Release note: None
  • Loading branch information
andrewbaptist committed Jan 26, 2024
1 parent c69a191 commit ab62138
Show file tree
Hide file tree
Showing 16 changed files with 45 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2301,7 +2301,7 @@ func (ds *DistSender) sendToReplicas(
metrics: &ds.metrics,
dontConsiderConnHealth: ds.dontConsiderConnHealth,
}
transport, err := ds.transportFactory(opts, replicas)
transport, err := ds.transportFactory(opts, replicas.AsReplicaSet())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
}
getInterceptingTransportFactory := func(nID roachpb.NodeID) func(kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) {
transport, tErr := factory(options, slice)
interceptor := &interceptingTransport{
Transport: transport,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func newTransportForRange(
}
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality)
opts := SendOptions{class: defRangefeedConnClass}
return ds.transportFactory(opts, replicas)
return ds.transportFactory(opts, replicas.AsReplicaSet())
}

// makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
Stopper: stopper,
TransportFactory: func(options SendOptions, slice ReplicaSlice) (Transport, error) {
TransportFactory: func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) {
return transport, nil
},
RangeDescriptorDB: rangeDB,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func makeTransportFactory(
rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn,
) func(kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) {
transport, err := factory(options, slice)
if err != nil {
return nil, err
Expand Down
27 changes: 6 additions & 21 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ var stubRPCSendFn simpleSendFn = func(
func adaptSimpleTransport(fn simpleSendFn) TransportFactory {
return func(
_ SendOptions,
replicas ReplicaSlice,
replicas roachpb.ReplicaSet,
) (Transport, error) {
return &simpleTransportAdapter{
fn: fn,
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestSendRPCOrder(t *testing.T) {
var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor) error

var transportFactory TransportFactory = func(
opts SendOptions, replicas ReplicaSlice,
opts SendOptions, replicas roachpb.ReplicaSet,
) (Transport, error) {
reps := replicas.Descriptors()
if err := verifyCall(opts, reps); err != nil {
Expand Down Expand Up @@ -3391,7 +3391,7 @@ func TestSenderTransport(t *testing.T) {
) (r *kvpb.BatchResponse, e *kvpb.Error) {
return
},
))(SendOptions{}, ReplicaSlice{{}})
))(SendOptions{}, roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{}}))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -4210,7 +4210,7 @@ func TestConnectionClass(t *testing.T) {
// class will capture the connection class used for the last transport
// created.
var class rpc.ConnectionClass
var transportFactory TransportFactory = func(opts SendOptions, replicas ReplicaSlice) (Transport, error) {
var transportFactory TransportFactory = func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) {
class = opts.class
return adaptSimpleTransport(
func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
Expand Down Expand Up @@ -5287,20 +5287,10 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
}
}

makeReplicaInfo := func(replicaID int, region string) ReplicaInfo {
return ReplicaInfo{
ReplicaDescriptor: roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(replicaID),
},
Locality: makeLocality(region),
}
}

for _, tc := range []struct {
name string
cfg *DistSenderConfig
desc *roachpb.RangeDescriptor
replicas ReplicaSlice
curReplica *roachpb.ReplicaDescriptor
expectedRead tenantcostmodel.NetworkCost
expectedWrite tenantcostmodel.NetworkCost
Expand Down Expand Up @@ -5368,12 +5358,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
{Key: "region", Value: "eu-central1"},
}},
},
desc: newRangeDescriptor(10),
replicas: []ReplicaInfo{
makeReplicaInfo(1, "foo"),
makeReplicaInfo(2, "bar"),
makeReplicaInfo(3, ""), // Missing region.
},
desc: newRangeDescriptor(10),
curReplica: &roachpb.ReplicaDescriptor{ReplicaID: 3},
expectedRead: 0,
expectedWrite: 0,
Expand Down Expand Up @@ -5479,7 +5464,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
tc.cfg.Stopper = stopper
tc.cfg.RangeDescriptorDB = rddb
tc.cfg.Settings = st
tc.cfg.TransportFactory = func(SendOptions, ReplicaSlice) (Transport, error) {
tc.cfg.TransportFactory = func(SendOptions, roachpb.ReplicaSet) (Transport, error) {
assert.Fail(t, "test should not try and use the transport factory")
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/local_test_cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewDistSenderForLocalTestCluster(
Stopper: stopper,
RPCRetryOptions: &retryOpts,
FirstRangeProvider: g,
TransportFactory: func(opts SendOptions, replicas ReplicaSlice) (Transport, error) {
TransportFactory: func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) {
transport, err := senderTransportFactory(opts, replicas)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/range_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func init() {
alphaRangeDescriptorDB = mockRangeDescriptorDBForDescs(
append(alphaRangeDescriptors, TestMetaRangeDescriptor)...,
)
tf = func(options SendOptions, slice ReplicaSlice) (Transport, error) {
tf = func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) {
panic("transport not set up for use")
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvclient/kvcoord/replayed_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T
}
args.ServerArgs.Knobs.KVClient = &kvcoord.ClientTestingKnobs{
TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) {
tf, err := factory(options, slice)
if err != nil {
return nil, err
Expand All @@ -74,7 +74,6 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T
}, nil

}

},
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
})
}

func (rs ReplicaSlice) AsReplicaSet() roachpb.ReplicaSet {
return roachpb.MakeReplicaSet(rs.Descriptors())
}

// Descriptors returns the ReplicaDescriptors inside the ReplicaSlice.
func (rs ReplicaSlice) Descriptors() []roachpb.ReplicaDescriptor {
reps := make([]roachpb.ReplicaDescriptor, len(rs))
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ func TestSendToOneClient(t *testing.T) {
// firstNErrorTransport is a mock transport that sends an error on
// requests to the first N addresses, then succeeds.
type firstNErrorTransport struct {
replicas ReplicaSlice
replicas roachpb.ReplicaSet
numErrors int
numSent int
}

func (f *firstNErrorTransport) IsExhausted() bool {
return f.numSent >= len(f.replicas)
return f.numSent >= len(f.replicas.Descriptors())
}

func (f *firstNErrorTransport) Release() {}
Expand All @@ -182,7 +182,7 @@ func (f *firstNErrorTransport) NextInternalClient(
}

func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor {
return f.replicas[f.numSent].ReplicaDescriptor
return f.replicas.Descriptors()[f.numSent]
}

func (f *firstNErrorTransport) SkipReplica() {
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestComplexScenarios(t *testing.T) {
t,
func(
_ SendOptions,
replicas ReplicaSlice,
replicas roachpb.ReplicaSet,
) (Transport, error) {
return &firstNErrorTransport{
replicas: replicas,
Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SendOptions struct {
//
// The caller is responsible for ordering the replicas in the slice according to
// the order in which the should be tried.
type TransportFactory func(SendOptions, ReplicaSlice) (Transport, error)
type TransportFactory func(SendOptions, roachpb.ReplicaSet) (Transport, error)

// Transport objects can send RPCs to one or more replicas of a range.
// All calls to Transport methods are made from a single thread, so
Expand Down Expand Up @@ -101,24 +101,24 @@ const (
// During race builds, we wrap this to hold on to and read all obtained
// requests in a tight loop, exposing data races; see transport_race.go.
func grpcTransportFactoryImpl(
opts SendOptions, nodeDialer *nodedialer.Dialer, rs ReplicaSlice,
opts SendOptions, nodeDialer *nodedialer.Dialer, rs roachpb.ReplicaSet,
) (Transport, error) {
transport := grpcTransportPool.Get().(*grpcTransport)
// Grab the saved slice memory from grpcTransport.
replicas := transport.replicas

if cap(replicas) < len(rs) {
replicas = make([]roachpb.ReplicaDescriptor, len(rs))
descriptors := rs.Descriptors()
if cap(replicas) < len(descriptors) {
replicas = make([]roachpb.ReplicaDescriptor, len(descriptors))
} else {
replicas = replicas[:len(rs)]
replicas = replicas[:len(descriptors)]
}

// We'll map the index of the replica descriptor in its slice to its health.
var health util.FastIntMap
for i := range rs {
r := &rs[i]
replicas[i] = r.ReplicaDescriptor
healthy := nodeDialer.ConnHealth(r.NodeID, opts.class) == nil
for i, desc := range descriptors {
replicas[i] = desc
healthy := nodeDialer.ConnHealth(desc.NodeID, opts.class) == nil
if healthy {
health.Set(i, healthHealthy)
} else {
Expand Down Expand Up @@ -317,9 +317,9 @@ func (h *byHealth) Less(i, j int) bool {
// Transport. This is useful for tests that want to use DistSender
// without a full RPC stack.
func SenderTransportFactory(tracer *tracing.Tracer, sender kv.Sender) TransportFactory {
return func(_ SendOptions, replicas ReplicaSlice) (Transport, error) {
return func(_ SendOptions, replicas roachpb.ReplicaSet) (Transport, error) {
// Always send to the first replica.
replica := replicas[0].ReplicaDescriptor
replica := replicas.First()
return &senderTransport{tracer, sender, replica, false}, nil
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/transport_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (tr raceTransport) SendNext(
// a) the server doesn't hold on to any memory, and
// b) the server doesn't mutate the request
func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory {
return func(opts SendOptions, replicas ReplicaSlice) (Transport, error) {
return func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) {
if atomic.AddInt32(&running, 1) <= 1 {
if err := nodeDialer.Stopper().RunAsyncTask(
context.TODO(), "transport racer", func(ctx context.Context) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvclient/kvcoord/transport_regular.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@

package kvcoord

import "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
)

// GRPCTransportFactory is the default TransportFactory, using GRPC.
func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory {
return func(options SendOptions, slice ReplicaSlice) (Transport, error) {
return func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) {
return grpcTransportFactoryImpl(options, nodeDialer, slice)
}
}
4 changes: 4 additions & 0 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (d ReplicaSet) String() string {
return redact.StringWithoutMarkers(d)
}

func (d ReplicaSet) First() ReplicaDescriptor {
return d.wrapped[0]
}

// Descriptors returns every replica descriptor in the set, including both voter
// replicas and learner replicas. Voter replicas are ordered first in the
// returned slice.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/ambiguous_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestAmbiguousCommit(t *testing.T) {

params.Knobs.KVClient = &kvcoord.ClientTestingKnobs{
TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) {
transport, err := factory(options, slice)
return &interceptingTransport{
Transport: transport,
Expand Down

0 comments on commit ab62138

Please sign in to comment.