Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: remove unused error from TransportFactory #121571

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2538,10 +2538,7 @@ func (ds *DistSender) sendToReplicas(
metrics: &ds.metrics,
dontConsiderConnHealth: ds.dontConsiderConnHealth,
}
transport, err := ds.transportFactory(opts, replicas)
if err != nil {
return nil, err
}
transport := ds.transportFactory(opts, replicas)
defer transport.Release()

// inTransferRetry is used to slow down retries in cases where an ongoing
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
}
getInterceptingTransportFactory := func(nID roachpb.NodeID) func(kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
transport, tErr := factory(options, slice)
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) kvcoord.Transport {
transport := factory(options, slice)
interceptor := &interceptingTransport{
Transport: transport,
nID: nID,
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
return nil
},
}
return interceptor, tErr
return interceptor
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,11 +774,7 @@ func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {
metrics: &r.d.metrics,
dontConsiderConnHealth: true,
}
transport, err := r.d.transportFactory(opts, replicas)
if err != nil {
log.Errorf(ctx, "failed to launch probe: %s", err)
return
}
transport := r.d.transportFactory(opts, replicas)
defer transport.Release()

// Start the write grace timer. Unlike reads, writes can't automatically be
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func newTransportForRange(
}
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality)
opts := SendOptions{class: defRangefeedConnClass}
return ds.transportFactory(opts, replicas)
return ds.transportFactory(opts, replicas), nil
}

// makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
Stopper: stopper,
TransportFactory: func(options SendOptions, slice ReplicaSlice) (Transport, error) {
return transport, nil
TransportFactory: func(SendOptions, ReplicaSlice) Transport {
return transport
},
RangeDescriptorDB: rangeDB,
Settings: cluster.MakeTestingClusterSettings(),
Expand Down
9 changes: 3 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,15 @@ func makeTransportFactory(
rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn,
) func(kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
transport, err := factory(options, slice)
if err != nil {
return nil, err
}
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) kvcoord.Transport {
transport := factory(options, slice)
countingTransport := &countConnectionsTransport{
Transport: transport,
rfStreamEnabled: rfStreamEnabled,
counts: counts,
wrapRangeFeedClient: wrapFn,
}
return countingTransport, nil
return countingTransport
}
}
}
Expand Down
41 changes: 16 additions & 25 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func adaptSimpleTransport(fn simpleSendFn) TransportFactory {
return func(
_ SendOptions,
replicas ReplicaSlice,
) (Transport, error) {
) Transport {
return &simpleTransportAdapter{
fn: fn,
replicas: replicas.Descriptors(),
}, nil
}
}
}

Expand Down Expand Up @@ -268,23 +268,19 @@ func TestSendRPCOrder(t *testing.T) {
type replicaTypeMap = map[roachpb.NodeID]roachpb.ReplicaType

// Gets filled below to identify the replica by its address.
makeVerifier := func(expNodes []roachpb.NodeID) func(SendOptions, []roachpb.ReplicaDescriptor) error {
return func(o SendOptions, replicas []roachpb.ReplicaDescriptor) error {
makeVerifier := func(t *testing.T, expNodes []roachpb.NodeID) func(SendOptions, []roachpb.ReplicaDescriptor) {
return func(o SendOptions, replicas []roachpb.ReplicaDescriptor) {
var actualAddrs []roachpb.NodeID
for i, r := range replicas {
if len(expNodes) <= i {
return errors.Errorf("got unexpected replica: %s", r)
}
require.Greater(t, len(expNodes), i)

if expNodes[i] == 0 {
actualAddrs = append(actualAddrs, 0)
} else {
actualAddrs = append(actualAddrs, r.NodeID)
}
}
if !reflect.DeepEqual(expNodes, actualAddrs) {
return errors.Errorf("expected %d, but found %d", expNodes, actualAddrs)
}
return nil
require.Equal(t, expNodes, actualAddrs)
}
}

Expand Down Expand Up @@ -420,15 +416,13 @@ func TestSendRPCOrder(t *testing.T) {
}

// Stub to be changed in each test case.
var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor) error
var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor)

var transportFactory TransportFactory = func(
opts SendOptions, replicas ReplicaSlice,
) (Transport, error) {
) Transport {
reps := replicas.Descriptors()
if err := verifyCall(opts, reps); err != nil {
return nil, err
}
verifyCall(opts, reps)
return adaptSimpleTransport(
func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
return ba.CreateReply(), nil
Expand Down Expand Up @@ -462,7 +456,7 @@ func TestSendRPCOrder(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
verifyCall = makeVerifier(tc.expReplica)
verifyCall = makeVerifier(t, tc.expReplica)

g.NodeID.Reset(6)
cfg.Locality = roachpb.Locality{
Expand Down Expand Up @@ -3491,7 +3485,7 @@ func TestCountRanges(t *testing.T) {
func TestSenderTransport(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
transport, err := SenderTransportFactory(
transport := SenderTransportFactory(
tracing.NewTracer(),
kv.SenderFunc(
func(
Expand All @@ -3501,10 +3495,7 @@ func TestSenderTransport(t *testing.T) {
return
},
))(SendOptions{}, ReplicaSlice{{}})
if err != nil {
t.Fatal(err)
}
_, err = transport.SendNext(context.Background(), &kvpb.BatchRequest{})
_, err := transport.SendNext(context.Background(), &kvpb.BatchRequest{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -4334,7 +4325,7 @@ func TestConnectionClass(t *testing.T) {
// class will capture the connection class used for the last transport
// created.
var class rpc.ConnectionClass
var transportFactory TransportFactory = func(opts SendOptions, replicas ReplicaSlice) (Transport, error) {
var transportFactory TransportFactory = func(opts SendOptions, replicas ReplicaSlice) Transport {
class = opts.class
return adaptSimpleTransport(
func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
Expand Down Expand Up @@ -5619,9 +5610,9 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
tc.cfg.Stopper = stopper
tc.cfg.RangeDescriptorDB = rddb
tc.cfg.Settings = st
tc.cfg.TransportFactory = func(SendOptions, ReplicaSlice) (Transport, error) {
tc.cfg.TransportFactory = func(SendOptions, ReplicaSlice) Transport {
assert.Fail(t, "test should not try and use the transport factory")
return nil, nil
return nil
}
ds := NewDistSender(*tc.cfg)

Expand Down
9 changes: 3 additions & 6 deletions pkg/kv/kvclient/kvcoord/local_test_cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ func NewDistSenderForLocalTestCluster(
Stopper: stopper,
RPCRetryOptions: &retryOpts,
FirstRangeProvider: g,
TransportFactory: func(opts SendOptions, replicas ReplicaSlice) (Transport, error) {
transport, err := senderTransportFactory(opts, replicas)
if err != nil {
return nil, err
}
return &localTestClusterTransport{transport, latency}, nil
TransportFactory: func(opts SendOptions, replicas ReplicaSlice) Transport {
transport := senderTransportFactory(opts, replicas)
return &localTestClusterTransport{transport, latency}
},
})
}
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/range_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func init() {
alphaRangeDescriptorDB = mockRangeDescriptorDBForDescs(
append(alphaRangeDescriptors, TestMetaRangeDescriptor)...,
)
tf = func(options SendOptions, slice ReplicaSlice) (Transport, error) {
tf = func(options SendOptions, slice ReplicaSlice) Transport {
panic("transport not set up for use")
}
}
Expand Down
9 changes: 3 additions & 6 deletions pkg/kv/kvclient/kvcoord/replayed_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,8 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T
}
args.ServerArgs.Knobs.KVClient = &kvcoord.ClientTestingKnobs{
TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
tf, err := factory(options, slice)
if err != nil {
return nil, err
}
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) kvcoord.Transport {
tf := factory(options, slice)
return &interceptingTransport{
Transport: tf,
afterSend: func(ctx context.Context, req *interceptedReq, resp *interceptedResp) (overrideResp *interceptedResp) {
Expand All @@ -71,7 +68,7 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T
assert.True(t, grpcutil.RequestDidNotStart(err)) // avoid Fatal on goroutine
return &interceptedResp{err: err}
},
}, nil
}

}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ func TestComplexScenarios(t *testing.T) {
func(
_ SendOptions,
replicas ReplicaSlice,
) (Transport, error) {
) Transport {
return &firstNErrorTransport{
replicas: replicas,
numErrors: test.numErrors,
}, nil
}
},
serverAddrs,
rpcContext,
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SendOptions struct {
//
// The caller is responsible for ordering the replicas in the slice according to
// the order in which the should be tried.
type TransportFactory func(SendOptions, ReplicaSlice) (Transport, error)
type TransportFactory func(SendOptions, ReplicaSlice) Transport

// Transport objects can send RPCs to one or more replicas of a range.
// All calls to Transport methods are made from a single thread, so
Expand Down Expand Up @@ -107,7 +107,7 @@ const (
// requests in a tight loop, exposing data races; see transport_race.go.
func grpcTransportFactoryImpl(
opts SendOptions, nodeDialer *nodedialer.Dialer, rs ReplicaSlice,
) (Transport, error) {
) Transport {
transport := grpcTransportPool.Get().(*grpcTransport)
// Grab the saved slice memory from grpcTransport.
replicas := transport.replicas
Expand Down Expand Up @@ -145,7 +145,7 @@ func grpcTransportFactoryImpl(
transport.splitHealthy()
}

return transport, nil
return transport
}

type grpcTransport struct {
Expand Down Expand Up @@ -326,10 +326,10 @@ func (h *byHealth) Less(i, j int) bool {
// Transport. This is useful for tests that want to use DistSender
// without a full RPC stack.
func SenderTransportFactory(tracer *tracing.Tracer, sender kv.Sender) TransportFactory {
return func(_ SendOptions, replicas ReplicaSlice) (Transport, error) {
return func(_ SendOptions, replicas ReplicaSlice) Transport {
// Always send to the first replica.
replica := replicas[0].ReplicaDescriptor
return &senderTransport{tracer, sender, replica, false}, nil
return &senderTransport{tracer, sender, replica, false}
}
}

Expand Down
9 changes: 3 additions & 6 deletions pkg/kv/kvclient/kvcoord/transport_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (tr raceTransport) SendNext(
// a) the server doesn't hold on to any memory, and
// b) the server doesn't mutate the request
func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory {
return func(opts SendOptions, replicas ReplicaSlice) (Transport, error) {
return func(opts SendOptions, replicas ReplicaSlice) Transport {
if atomic.AddInt32(&running, 1) <= 1 {
if err := nodeDialer.Stopper().RunAsyncTask(
context.TODO(), "transport racer", func(ctx context.Context) {
Expand Down Expand Up @@ -147,10 +147,7 @@ func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory {
}
}

t, err := grpcTransportFactoryImpl(opts, nodeDialer, replicas)
if err != nil {
return nil, err
}
return &raceTransport{Transport: t}, nil
t := grpcTransportFactoryImpl(opts, nodeDialer, replicas)
return &raceTransport{Transport: t}
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/transport_regular.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"

// GRPCTransportFactory is the default TransportFactory, using GRPC.
func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory {
return func(options SendOptions, slice ReplicaSlice) (Transport, error) {
return func(options SendOptions, slice ReplicaSlice) Transport {
return grpcTransportFactoryImpl(options, nodeDialer, slice)
}
}
6 changes: 3 additions & 3 deletions pkg/sql/ambiguous_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func TestAmbiguousCommit(t *testing.T) {

params.Knobs.KVClient = &kvcoord.ClientTestingKnobs{
TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory {
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) {
transport, err := factory(options, slice)
return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) kvcoord.Transport {
transport := factory(options, slice)
return &interceptingTransport{
Transport: transport,
sendNext: func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestAmbiguousCommit(t *testing.T) {
return transport.SendNext(ctx, ba)
}
},
}, err
}
}
},
}
Expand Down
Loading