From e42462643127f24a275a457afeb0285bcd7b4ef0 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 20 Aug 2018 14:42:30 -0400 Subject: [PATCH 1/5] kv: add BatchRequest to Transport.SendNext Release note: None --- pkg/kv/dist_sender.go | 12 ++--- pkg/kv/dist_sender_test.go | 17 ++++--- pkg/kv/local_test_cluster_util.go | 9 ++-- pkg/kv/send_test.go | 7 ++- pkg/kv/transport.go | 75 ++++++++++++++++--------------- pkg/kv/transport_race.go | 36 +++++++++++---- pkg/kv/transport_regular.go | 5 +-- pkg/kv/transport_test.go | 20 ++++----- pkg/sql/ambiguous_commit_test.go | 22 ++++----- pkg/storage/client_test.go | 21 ++++----- 10 files changed, 121 insertions(+), 103 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index b06d41f5a335..f5c83cfee074 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -1257,18 +1257,18 @@ func (ds *DistSender) sendToReplicas( opts SendOptions, rangeID roachpb.RangeID, replicas ReplicaSlice, - args roachpb.BatchRequest, + ba roachpb.BatchRequest, nodeDialer *nodedialer.Dialer, ) (*roachpb.BatchResponse, error) { var ambiguousError error var haveCommit bool // We only check for committed txns, not aborts because aborts may // be retried without any risk of inconsistencies. - if etArg, ok := args.GetArg(roachpb.EndTransaction); ok { + if etArg, ok := ba.GetArg(roachpb.EndTransaction); ok { haveCommit = etArg.(*roachpb.EndTransactionRequest).Commit } - transport, err := ds.transportFactory(opts, nodeDialer, replicas, args) + transport, err := ds.transportFactory(opts, nodeDialer, replicas) if err != nil { return nil, err } @@ -1279,8 +1279,8 @@ func (ds *DistSender) sendToReplicas( } curReplica := transport.NextReplica() - log.VEventf(ctx, 2, "r%d: sending batch %s to %s", rangeID, args.Summary(), curReplica) - br, err := transport.SendNext(ctx) + log.VEventf(ctx, 2, "r%d: sending batch %s to %s", rangeID, ba.Summary(), curReplica) + br, err := transport.SendNext(ctx, ba) // This loop will retry operations that fail with errors that reflect // per-replica state and may succeed on other replicas. @@ -1394,6 +1394,6 @@ func (ds *DistSender) sendToReplicas( ds.metrics.NextReplicaErrCount.Inc(1) curReplica = transport.NextReplica() log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, curReplica) - br, err = transport.SendNext(ctx) + br, err = transport.SendNext(ctx, ba) } } diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index f0647ac4e89a..714f378fc4f2 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -129,13 +129,11 @@ func adaptSimpleTransport(fn simpleSendFn) TransportFactory { opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, - args roachpb.BatchRequest, ) (Transport, error) { return &simpleTransportAdapter{ fn: fn, opts: opts, - replicas: replicas, - args: args}, nil + replicas: replicas}, nil } } @@ -143,7 +141,6 @@ type simpleTransportAdapter struct { fn simpleSendFn opts SendOptions replicas ReplicaSlice - args roachpb.BatchRequest nextReplica int } @@ -155,10 +152,12 @@ func (l *simpleTransportAdapter) GetPending() []roachpb.ReplicaDescriptor { return nil } -func (l *simpleTransportAdapter) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) { - l.args.Replica = l.replicas[l.nextReplica].ReplicaDescriptor +func (l *simpleTransportAdapter) SendNext( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + ba.Replica = l.replicas[l.nextReplica].ReplicaDescriptor l.nextReplica++ - return l.fn(ctx, l.opts, l.replicas, l.args) + return l.fn(ctx, l.opts, l.replicas, ba) } func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor { @@ -2044,11 +2043,11 @@ func TestSenderTransport(t *testing.T) { ) (r *roachpb.BatchResponse, e *roachpb.Error) { return }, - ))(SendOptions{}, &nodedialer.Dialer{}, ReplicaSlice{{}}, roachpb.BatchRequest{}) + ))(SendOptions{}, &nodedialer.Dialer{}, ReplicaSlice{{}}) if err != nil { t.Fatal(err) } - _, err = transport.SendNext(context.Background()) + _, err = transport.SendNext(context.Background(), roachpb.BatchRequest{}) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/local_test_cluster_util.go b/pkg/kv/local_test_cluster_util.go index 8f2d1a4d13a3..6501040506f4 100644 --- a/pkg/kv/local_test_cluster_util.go +++ b/pkg/kv/local_test_cluster_util.go @@ -38,11 +38,13 @@ type localTestClusterTransport struct { latency time.Duration } -func (l *localTestClusterTransport) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) { +func (l *localTestClusterTransport) SendNext( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { if l.latency > 0 { time.Sleep(l.latency) } - return l.Transport.SendNext(ctx) + return l.Transport.SendNext(ctx, ba) } // InitFactoryForLocalTestCluster initializes a TxnCoordSenderFactory @@ -94,9 +96,8 @@ func NewDistSenderForLocalTestCluster( opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, - args roachpb.BatchRequest, ) (Transport, error) { - transport, err := senderTransportFactory(opts, nodeDialer, replicas, args) + transport, err := senderTransportFactory(opts, nodeDialer, replicas) if err != nil { return nil, err } diff --git a/pkg/kv/send_test.go b/pkg/kv/send_test.go index 91bde5eec5a4..ef5439a71346 100644 --- a/pkg/kv/send_test.go +++ b/pkg/kv/send_test.go @@ -89,7 +89,6 @@ func TestSendToOneClient(t *testing.T) { // requests to the first N addresses, then succeeds. type firstNErrorTransport struct { replicas ReplicaSlice - args roachpb.BatchRequest numErrors int numSent int } @@ -102,7 +101,9 @@ func (f *firstNErrorTransport) GetPending() []roachpb.ReplicaDescriptor { return nil } -func (f *firstNErrorTransport) SendNext(_ context.Context) (*roachpb.BatchResponse, error) { +func (f *firstNErrorTransport) SendNext( + _ context.Context, _ roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { var err error if f.numSent < f.numErrors { err = roachpb.NewSendError("test") @@ -171,11 +172,9 @@ func TestComplexScenarios(t *testing.T) { _ SendOptions, _ *nodedialer.Dialer, replicas ReplicaSlice, - args roachpb.BatchRequest, ) (Transport, error) { return &firstNErrorTransport{ replicas: replicas, - args: args, numErrors: test.numErrors, }, nil }, diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index 0aee477a91af..cb2860141196 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -43,7 +43,7 @@ type SendOptions struct { type batchClient struct { nodeID roachpb.NodeID - args roachpb.BatchRequest + replica roachpb.ReplicaDescriptor healthy bool pending bool retryable bool @@ -52,8 +52,8 @@ type batchClient struct { // TransportFactory encapsulates all interaction with the RPC // subsystem, allowing it to be mocked out for testing. The factory -// function returns a Transport object which is used to send the given -// arguments to one or more replicas in the slice. +// function returns a Transport object which is used to send requests +// to one or more replicas in the slice. // // In addition to actually sending RPCs, the transport is responsible // for ordering replicas in accordance with SendOptions.Ordering and @@ -61,9 +61,7 @@ type batchClient struct { // // TODO(bdarnell): clean up this crufty interface; it was extracted // verbatim from the non-abstracted code. -type TransportFactory func( - SendOptions, *nodedialer.Dialer, ReplicaSlice, roachpb.BatchRequest, -) (Transport, error) +type TransportFactory func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) // Transport objects can send RPCs to one or more replicas of a range. // All calls to Transport methods are made from a single thread, so @@ -75,12 +73,12 @@ type Transport interface { // GetPending returns the replica(s) to which requests are still pending. GetPending() []roachpb.ReplicaDescriptor - // SendNext synchronously sends the rpc (captured at creation time) to the - // next replica. May panic if the transport is exhausted. + // SendNext synchronously sends the BatchRequest rpc to the next replica. + // May panic if the transport is exhausted. // // SendNext is also in charge of importing the remotely collected spans (if // any) into the local trace. - SendNext(context.Context) (*roachpb.BatchResponse, error) + SendNext(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, error) // NextReplica returns the replica descriptor of the replica to be tried in // the next call to SendNext. MoveToFront will cause the return value to @@ -104,16 +102,14 @@ type Transport interface { // During race builds, we wrap this to hold on to and read all obtained // requests in a tight loop, exposing data races; see transport_race.go. func grpcTransportFactoryImpl( - opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, args roachpb.BatchRequest, + opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, ) (Transport, error) { clients := make([]batchClient, 0, len(replicas)) for _, replica := range replicas { - argsCopy := args - argsCopy.Replica = replica.ReplicaDescriptor healthy := nodeDialer.ConnHealth(replica.NodeID) == nil clients = append(clients, batchClient{ nodeID: replica.NodeID, - args: argsCopy, + replica: replica.ReplicaDescriptor, healthy: healthy, }) } @@ -158,7 +154,7 @@ func (gt *grpcTransport) GetPending() []roachpb.ReplicaDescriptor { var pending []roachpb.ReplicaDescriptor for i := range gt.orderedClients { if gt.orderedClients[i].pending { - pending = append(pending, gt.orderedClients[i].args.Replica) + pending = append(pending, gt.orderedClients[i].replica) } } return pending @@ -176,7 +172,7 @@ func (gt *grpcTransport) maybeResurrectRetryablesLocked() bool { } } for _, c := range resurrect { - gt.moveToFrontLocked(c.args.Replica) + gt.moveToFrontLocked(c.replica) } return len(resurrect) > 0 } @@ -184,22 +180,25 @@ func (gt *grpcTransport) maybeResurrectRetryablesLocked() bool { // SendNext invokes the specified RPC on the supplied client when the // client is ready. On success, the reply is sent on the channel; // otherwise an error is sent. -func (gt *grpcTransport) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) { +func (gt *grpcTransport) SendNext( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { client := gt.orderedClients[gt.clientIndex] gt.clientIndex++ - gt.setState(client.args.Replica, true /* pending */, false /* retryable */) + ba.Replica = client.replica + gt.setState(ba.Replica, true /* pending */, false /* retryable */) { var cancel func() ctx, cancel = context.WithCancel(ctx) gt.cancels = append(gt.cancels, cancel) } - return gt.send(ctx, client) + return gt.send(ctx, client, ba) } func (gt *grpcTransport) send( - ctx context.Context, client batchClient, + ctx context.Context, client batchClient, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { reply, err := func() (*roachpb.BatchResponse, error) { // Bail out early if the context is already canceled. (GRPC will @@ -219,12 +218,12 @@ func (gt *grpcTransport) send( if rpc.IsLocal(iface) { gt.opts.metrics.LocalSentCount.Inc(1) } - reply, err := iface.Batch(ctx, &client.args) + reply, err := iface.Batch(ctx, &ba) // If we queried a remote node, perform extra validation and // import trace spans. if reply != nil && !rpc.IsLocal(iface) { for i := range reply.Responses { - if err := reply.Responses[i].GetInner().Verify(client.args.Requests[i].GetInner()); err != nil { + if err := reply.Responses[i].GetInner().Verify(ba.Requests[i].GetInner()); err != nil { log.Error(ctx, err) } } @@ -252,7 +251,7 @@ func (gt *grpcTransport) send( retryable = true } } - gt.setState(client.args.Replica, false /* pending */, retryable) + gt.setState(client.replica, false /* pending */, retryable) return reply, err } @@ -261,7 +260,7 @@ func (gt *grpcTransport) NextReplica() roachpb.ReplicaDescriptor { if gt.IsExhausted() { return roachpb.ReplicaDescriptor{} } - return gt.orderedClients[gt.clientIndex].args.Replica + return gt.orderedClients[gt.clientIndex].replica } func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { @@ -272,7 +271,7 @@ func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { func (gt *grpcTransport) moveToFrontLocked(replica roachpb.ReplicaDescriptor) { for i := range gt.orderedClients { - if gt.orderedClients[i].args.Replica == replica { + if gt.orderedClients[i].replica == replica { // If a call to this replica is active, don't move it. if gt.orderedClients[i].pending { return @@ -309,7 +308,7 @@ func (gt *grpcTransport) setState(replica roachpb.ReplicaDescriptor, pending, re gt.clientPendingMu.Lock() defer gt.clientPendingMu.Unlock() for i := range gt.orderedClients { - if gt.orderedClients[i].args.Replica == replica { + if gt.orderedClients[i].replica == replica { gt.orderedClients[i].pending = pending gt.orderedClients[i].retryable = retryable if retryable { @@ -348,18 +347,18 @@ func (h byHealth) Less(i, j int) bool { return h[i].healthy && !h[j].healthy } // without a full RPC stack. func SenderTransportFactory(tracer opentracing.Tracer, sender client.Sender) TransportFactory { return func( - _ SendOptions, _ *nodedialer.Dialer, replicas ReplicaSlice, args roachpb.BatchRequest, + _ SendOptions, _ *nodedialer.Dialer, replicas ReplicaSlice, ) (Transport, error) { // Always send to the first replica. - args.Replica = replicas[0].ReplicaDescriptor - return &senderTransport{tracer, sender, args, false}, nil + replica := replicas[0].ReplicaDescriptor + return &senderTransport{tracer, sender, replica, false}, nil } } type senderTransport struct { - tracer opentracing.Tracer - sender client.Sender - args roachpb.BatchRequest + tracer opentracing.Tracer + sender client.Sender + replica roachpb.ReplicaDescriptor called bool } @@ -369,19 +368,23 @@ func (s *senderTransport) IsExhausted() bool { } func (s *senderTransport) GetPending() []roachpb.ReplicaDescriptor { - return []roachpb.ReplicaDescriptor{s.args.Replica} + return []roachpb.ReplicaDescriptor{s.replica} } -func (s *senderTransport) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) { +func (s *senderTransport) SendNext( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { if s.called { panic("called an exhausted transport") } s.called = true + sp := s.tracer.StartSpan("node") defer sp.Finish() + ba.Replica = s.replica ctx = opentracing.ContextWithSpan(ctx, sp) - log.Event(ctx, s.args.String()) - br, pErr := s.sender.Send(ctx, s.args) + log.Event(ctx, ba.String()) + br, pErr := s.sender.Send(ctx, ba) if br == nil { br = &roachpb.BatchResponse{} } @@ -411,7 +414,7 @@ func (s *senderTransport) NextReplica() roachpb.ReplicaDescriptor { if s.IsExhausted() { return roachpb.ReplicaDescriptor{} } - return s.args.Replica + return s.replica } func (s *senderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { diff --git a/pkg/kv/transport_race.go b/pkg/kv/transport_race.go index e10b0fa8c090..b80a010f450e 100644 --- a/pkg/kv/transport_race.go +++ b/pkg/kv/transport_race.go @@ -49,6 +49,27 @@ func jitter(avgInterval time.Duration) time.Duration { return time.Duration(rand.Int63n(int64(2 * avgInterval))) } +// raceTransport wrap a Transport implementation and intercepts all +// BatchRequests, sending them to the transport racer task to read +// them asynchronously in a tight loop. +type raceTransport struct { + Transport +} + +func (tr raceTransport) SendNext( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + select { + // We have a shallow copy here and so the top level scalar fields can't + // really race, but making more copies doesn't make anything more + // transparent, so from now on we operate on a pointer. + case incoming <- &ba: + default: + // Avoid slowing down the tests if we're backed up. + } + return tr.Transport.SendNext(ctx, ba) +} + // GRPCTransportFactory during race builds wraps the implementation and // intercepts all BatchRequests, reading them asynchronously in a tight loop. // This allows the race detector to catch any mutations of a batch passed to the @@ -56,7 +77,7 @@ func jitter(avgInterval time.Duration) time.Duration { // neither the client nor the server are allowed to mutate anything and this // transport makes sure they don't. See client.Sender() for more. func GRPCTransportFactory( - opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, args roachpb.BatchRequest, + opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, ) (Transport, error) { if atomic.AddInt32(&running, 1) <= 1 { // NB: We can't use Stopper.RunWorker because doing so would race with @@ -113,13 +134,10 @@ func GRPCTransportFactory( atomic.StoreInt32(&running, 0) } } - select { - // We have a shallow copy here and so the top level scalar fields can't - // really race, but making more copies doesn't make anything more - // transparent, so from now on we operate on a pointer. - case incoming <- &args: - default: - // Avoid slowing down the tests if we're backed up. + + t, err := grpcTransportFactoryImpl(opts, nodeDialer, replicas) + if err != nil { + return nil, err } - return grpcTransportFactoryImpl(opts, nodeDialer, replicas, args) + return &raceTransport{Transport: t}, nil } diff --git a/pkg/kv/transport_regular.go b/pkg/kv/transport_regular.go index 38f3d5c0e1c7..c19b1e9ad017 100644 --- a/pkg/kv/transport_regular.go +++ b/pkg/kv/transport_regular.go @@ -18,13 +18,12 @@ package kv import ( - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" ) // GRPCTransportFactory is the default TransportFactory, using GRPC. func GRPCTransportFactory( - opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, args roachpb.BatchRequest, + opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, ) (Transport, error) { - return grpcTransportFactoryImpl(opts, nodeDialer, replicas, args) + return grpcTransportFactoryImpl(opts, nodeDialer, replicas) } diff --git a/pkg/kv/transport_test.go b/pkg/kv/transport_test.go index db40ac5dbd6d..f3dddb33d769 100644 --- a/pkg/kv/transport_test.go +++ b/pkg/kv/transport_test.go @@ -28,18 +28,18 @@ func TestTransportMoveToFront(t *testing.T) { rd2 := roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2} rd3 := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3} clients := []batchClient{ - {args: roachpb.BatchRequest{Header: roachpb.Header{Replica: rd1}}}, - {args: roachpb.BatchRequest{Header: roachpb.Header{Replica: rd2}}}, - {args: roachpb.BatchRequest{Header: roachpb.Header{Replica: rd3}}}, + {replica: rd1}, + {replica: rd2}, + {replica: rd3}, } gt := grpcTransport{orderedClients: clients} verifyOrder := func(replicas []roachpb.ReplicaDescriptor) { file, line, _ := caller.Lookup(1) for i, bc := range gt.orderedClients { - if bc.args.Replica != replicas[i] { + if bc.replica != replicas[i] { t.Fatalf("%s:%d: expected order %+v; got mismatch at index %d: %+v", - file, line, replicas, i, bc.args.Replica) + file, line, replicas, i, bc.replica) } } } @@ -64,16 +64,16 @@ func TestTransportMoveToFront(t *testing.T) { gt.MoveToFront(rd3) verifyOrder([]roachpb.ReplicaDescriptor{rd3, rd1, rd2}) if gt.clientIndex != 0 { - t.Fatalf("expected cient index 0; got %d", gt.clientIndex) + t.Fatalf("expected client index 0; got %d", gt.clientIndex) } - // Advance the client index again and verify replica 3 cann + // Advance the client index again and verify replica 3 can // be moved to front for a second retry. gt.clientIndex++ gt.MoveToFront(rd3) verifyOrder([]roachpb.ReplicaDescriptor{rd3, rd1, rd2}) if gt.clientIndex != 0 { - t.Fatalf("expected cient index 0; got %d", gt.clientIndex) + t.Fatalf("expected client index 0; got %d", gt.clientIndex) } // Mark replica 2 no longer pending. Should be able to move it. @@ -92,7 +92,7 @@ func TestTransportMoveToFront(t *testing.T) { gt.MoveToFront(rd1) verifyOrder([]roachpb.ReplicaDescriptor{rd2, rd1, rd3}) if gt.clientIndex != 1 { - t.Fatalf("expected cient index 1; got %d", gt.clientIndex) + t.Fatalf("expected client index 1; got %d", gt.clientIndex) } // Advance client index once more; verify second retry. @@ -100,6 +100,6 @@ func TestTransportMoveToFront(t *testing.T) { gt.MoveToFront(rd2) verifyOrder([]roachpb.ReplicaDescriptor{rd1, rd2, rd3}) if gt.clientIndex != 1 { - t.Fatalf("expected cient index 1; got %d", gt.clientIndex) + t.Fatalf("expected client index 1; got %d", gt.clientIndex) } } diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 2ad166613335..2e5099413b25 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -40,14 +40,16 @@ import ( type interceptingTransport struct { kv.Transport - sendNext func(context.Context) (*roachpb.BatchResponse, error) + sendNext func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, error) } -func (t *interceptingTransport) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) { +func (t *interceptingTransport) SendNext( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { if fn := t.sendNext; fn != nil { - return fn(ctx) + return fn(ctx, ba) } else { - return t.Transport.SendNext(ctx) + return t.Transport.SendNext(ctx, ba) } } @@ -85,13 +87,13 @@ func TestAmbiguousCommit(t *testing.T) { } params.Knobs.KVClient = &kv.ClientTestingKnobs{ - TransportFactory: func(opts kv.SendOptions, nodeDialer *nodedialer.Dialer, replicas kv.ReplicaSlice, args roachpb.BatchRequest) (kv.Transport, error) { - transport, err := kv.GRPCTransportFactory(opts, nodeDialer, replicas, args) + TransportFactory: func(opts kv.SendOptions, nodeDialer *nodedialer.Dialer, replicas kv.ReplicaSlice) (kv.Transport, error) { + transport, err := kv.GRPCTransportFactory(opts, nodeDialer, replicas) return &interceptingTransport{ Transport: transport, - sendNext: func(ctx context.Context) (*roachpb.BatchResponse, error) { + sendNext: func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { if ambiguousSuccess { - br, err := transport.SendNext(ctx) + br, err := transport.SendNext(ctx, ba) // During shutdown, we may get responses that // have call.Err set and all we have to do is // not crash on those. @@ -105,7 +107,7 @@ func TestAmbiguousCommit(t *testing.T) { } return br, err } else { - if req, ok := args.GetArg(roachpb.ConditionalPut); ok { + if req, ok := ba.GetArg(roachpb.ConditionalPut); ok { if pErr := maybeRPCError(req.(*roachpb.ConditionalPutRequest)); pErr != nil { // Blackhole the RPC and return an // error to simulate an ambiguous @@ -113,7 +115,7 @@ func TestAmbiguousCommit(t *testing.T) { return nil, pErr.GoError() } } - return transport.SendNext(ctx) + return transport.SendNext(ctx, ba) } }, }, err diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 0661d3c6b7a9..8a512c870752 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -425,7 +425,6 @@ type multiTestContextKVTransport struct { mtc *multiTestContext idx int replicas kv.ReplicaSlice - args roachpb.BatchRequest mu struct { syncutil.Mutex pending map[roachpb.ReplicaID]struct{} @@ -433,12 +432,11 @@ type multiTestContextKVTransport struct { } func (m *multiTestContext) kvTransportFactory( - _ kv.SendOptions, _ *nodedialer.Dialer, replicas kv.ReplicaSlice, args roachpb.BatchRequest, + _ kv.SendOptions, _ *nodedialer.Dialer, replicas kv.ReplicaSlice, ) (kv.Transport, error) { t := &multiTestContextKVTransport{ mtc: m, replicas: replicas, - args: args, } t.mu.pending = map[roachpb.ReplicaID]struct{}{} return t, nil @@ -457,7 +455,7 @@ func (t *multiTestContextKVTransport) GetPending() []roachpb.ReplicaDescriptor { } func (t *multiTestContextKVTransport) SendNext( - ctx context.Context, + ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { if ctx.Err() != nil { return nil, errors.Wrap(ctx.Err(), "send context is canceled") @@ -489,14 +487,13 @@ func (t *multiTestContextKVTransport) SendNext( t.mtc.mu.RLock() sender := t.mtc.senders[nodeIndex] t.mtc.mu.RUnlock() - // Make a copy and clone txn of batch args for sending. - baCopy := t.args - baCopy.Replica = rep.ReplicaDescriptor - if txn := baCopy.Txn; txn != nil { - txnClone := baCopy.Txn.Clone() - baCopy.Txn = &txnClone - } - br, pErr := sender.Send(ctx, baCopy) + // Clone txn of ba args for sending. + ba.Replica = rep.ReplicaDescriptor + if txn := ba.Txn; txn != nil { + txnClone := ba.Txn.Clone() + ba.Txn = &txnClone + } + br, pErr := sender.Send(ctx, ba) if br == nil { br = &roachpb.BatchResponse{} } From f8c7fa4ffad7e82de1e918866899914a6fbcb0ca Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 20 Aug 2018 14:51:01 -0400 Subject: [PATCH 2/5] kv: rip out Transport.GetPending and batchClient.pending These aren't needed anymore. Release note: None --- pkg/kv/dist_sender_test.go | 4 ---- pkg/kv/send_test.go | 4 ---- pkg/kv/transport.go | 37 +++++-------------------------------- pkg/kv/transport_test.go | 8 +------- pkg/storage/client_test.go | 4 ---- 5 files changed, 6 insertions(+), 51 deletions(-) diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index 714f378fc4f2..deb91c554c62 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -148,10 +148,6 @@ func (l *simpleTransportAdapter) IsExhausted() bool { return l.nextReplica >= len(l.replicas) } -func (l *simpleTransportAdapter) GetPending() []roachpb.ReplicaDescriptor { - return nil -} - func (l *simpleTransportAdapter) SendNext( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { diff --git a/pkg/kv/send_test.go b/pkg/kv/send_test.go index ef5439a71346..2736fad2229e 100644 --- a/pkg/kv/send_test.go +++ b/pkg/kv/send_test.go @@ -97,10 +97,6 @@ func (f *firstNErrorTransport) IsExhausted() bool { return f.numSent >= len(f.replicas) } -func (f *firstNErrorTransport) GetPending() []roachpb.ReplicaDescriptor { - return nil -} - func (f *firstNErrorTransport) SendNext( _ context.Context, _ roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index cb2860141196..bf16bcc6eca6 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -45,7 +45,6 @@ type batchClient struct { nodeID roachpb.NodeID replica roachpb.ReplicaDescriptor healthy bool - pending bool retryable bool deadline time.Time } @@ -70,9 +69,6 @@ type Transport interface { // IsExhausted returns true if there are no more replicas to try. IsExhausted() bool - // GetPending returns the replica(s) to which requests are still pending. - GetPending() []roachpb.ReplicaDescriptor - // SendNext synchronously sends the BatchRequest rpc to the next replica. // May panic if the transport is exhausted. // @@ -147,19 +143,6 @@ func (gt *grpcTransport) IsExhausted() bool { return !gt.maybeResurrectRetryablesLocked() } -// GetPending returns the replica(s) to which requests are still pending. -func (gt *grpcTransport) GetPending() []roachpb.ReplicaDescriptor { - gt.clientPendingMu.Lock() - defer gt.clientPendingMu.Unlock() - var pending []roachpb.ReplicaDescriptor - for i := range gt.orderedClients { - if gt.orderedClients[i].pending { - pending = append(pending, gt.orderedClients[i].replica) - } - } - return pending -} - // maybeResurrectRetryablesLocked moves already-tried replicas which // experienced a retryable error (currently this means a // NotLeaseHolderError) into a newly-active state so that they can be @@ -167,7 +150,7 @@ func (gt *grpcTransport) GetPending() []roachpb.ReplicaDescriptor { func (gt *grpcTransport) maybeResurrectRetryablesLocked() bool { var resurrect []batchClient for i := 0; i < gt.clientIndex; i++ { - if c := gt.orderedClients[i]; !c.pending && c.retryable && timeutil.Since(c.deadline) >= 0 { + if c := gt.orderedClients[i]; c.retryable && timeutil.Since(c.deadline) >= 0 { resurrect = append(resurrect, c) } } @@ -186,14 +169,13 @@ func (gt *grpcTransport) SendNext( client := gt.orderedClients[gt.clientIndex] gt.clientIndex++ - ba.Replica = client.replica - gt.setState(ba.Replica, true /* pending */, false /* retryable */) - { var cancel func() ctx, cancel = context.WithCancel(ctx) gt.cancels = append(gt.cancels, cancel) } + + ba.Replica = client.replica return gt.send(ctx, client, ba) } @@ -251,7 +233,7 @@ func (gt *grpcTransport) send( retryable = true } } - gt.setState(client.replica, false /* pending */, retryable) + gt.setState(client.replica, retryable) return reply, err } @@ -272,10 +254,6 @@ func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { func (gt *grpcTransport) moveToFrontLocked(replica roachpb.ReplicaDescriptor) { for i := range gt.orderedClients { if gt.orderedClients[i].replica == replica { - // If a call to this replica is active, don't move it. - if gt.orderedClients[i].pending { - return - } // Clear the retryable bit as this replica is being made // available. gt.orderedClients[i].retryable = false @@ -304,12 +282,11 @@ func (gt *grpcTransport) Close() { // mutate, but the clients reside in a slice which is shuffled via // MoveToFront, making it unsafe to mutate the client through a reference to // the slice. -func (gt *grpcTransport) setState(replica roachpb.ReplicaDescriptor, pending, retryable bool) { +func (gt *grpcTransport) setState(replica roachpb.ReplicaDescriptor, retryable bool) { gt.clientPendingMu.Lock() defer gt.clientPendingMu.Unlock() for i := range gt.orderedClients { if gt.orderedClients[i].replica == replica { - gt.orderedClients[i].pending = pending gt.orderedClients[i].retryable = retryable if retryable { gt.orderedClients[i].deadline = timeutil.Now().Add(time.Second) @@ -367,10 +344,6 @@ func (s *senderTransport) IsExhausted() bool { return s.called } -func (s *senderTransport) GetPending() []roachpb.ReplicaDescriptor { - return []roachpb.ReplicaDescriptor{s.replica} -} - func (s *senderTransport) SendNext( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { diff --git a/pkg/kv/transport_test.go b/pkg/kv/transport_test.go index f3dddb33d769..f26a79b0c3e1 100644 --- a/pkg/kv/transport_test.go +++ b/pkg/kv/transport_test.go @@ -54,11 +54,6 @@ func TestTransportMoveToFront(t *testing.T) { gt.MoveToFront(rd3) verifyOrder([]roachpb.ReplicaDescriptor{rd3, rd1, rd2}) - // Mark replica 2 pending. Shouldn't be able to move it. - clients[2].pending = true - gt.MoveToFront(rd2) - verifyOrder([]roachpb.ReplicaDescriptor{rd3, rd1, rd2}) - // Advance the client index and move replica 3 back to front. gt.clientIndex++ gt.MoveToFront(rd3) @@ -76,8 +71,7 @@ func TestTransportMoveToFront(t *testing.T) { t.Fatalf("expected client index 0; got %d", gt.clientIndex) } - // Mark replica 2 no longer pending. Should be able to move it. - clients[2].pending = false + // Move replica 2 to the front. gt.MoveToFront(rd2) verifyOrder([]roachpb.ReplicaDescriptor{rd2, rd1, rd3}) diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 8a512c870752..473c265c4248 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -450,10 +450,6 @@ func (t *multiTestContextKVTransport) IsExhausted() bool { return t.idx == len(t.replicas) } -func (t *multiTestContextKVTransport) GetPending() []roachpb.ReplicaDescriptor { - return nil -} - func (t *multiTestContextKVTransport) SendNext( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { From 5fc2e40537be5107a254066452441d72f61a7ff3 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 20 Aug 2018 14:52:57 -0400 Subject: [PATCH 3/5] kv: rip out clientPendingMu from grpcTransport This no longer needs to be thread-safe. Release note: None --- pkg/kv/transport.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index bf16bcc6eca6..26f630fbaa4a 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) @@ -121,13 +120,12 @@ func grpcTransportFactoryImpl( } type grpcTransport struct { - opts SendOptions - nodeDialer *nodedialer.Dialer - clientIndex int - orderedClients []batchClient - clientPendingMu syncutil.Mutex // protects access to all batchClient pending flags - closeWG sync.WaitGroup // waits until all SendNext goroutines are done - cancels []func() // called on Close() + opts SendOptions + nodeDialer *nodedialer.Dialer + clientIndex int + orderedClients []batchClient + closeWG sync.WaitGroup // waits until all SendNext goroutines are done + cancels []func() // called on Close() } // IsExhausted returns false if there are any untried replicas remaining. If @@ -135,8 +133,6 @@ type grpcTransport struct { // failed with a retryable error. If any where resurrected, returns false; // true otherwise. func (gt *grpcTransport) IsExhausted() bool { - gt.clientPendingMu.Lock() - defer gt.clientPendingMu.Unlock() if gt.clientIndex < len(gt.orderedClients) { return false } @@ -246,8 +242,6 @@ func (gt *grpcTransport) NextReplica() roachpb.ReplicaDescriptor { } func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { - gt.clientPendingMu.Lock() - defer gt.clientPendingMu.Unlock() gt.moveToFrontLocked(replica) } @@ -283,8 +277,6 @@ func (gt *grpcTransport) Close() { // MoveToFront, making it unsafe to mutate the client through a reference to // the slice. func (gt *grpcTransport) setState(replica roachpb.ReplicaDescriptor, retryable bool) { - gt.clientPendingMu.Lock() - defer gt.clientPendingMu.Unlock() for i := range gt.orderedClients { if gt.orderedClients[i].replica == replica { gt.orderedClients[i].retryable = retryable From 768d275eabd7ddbf93cc18e1b8be6efc6d9a4344 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 20 Aug 2018 14:55:41 -0400 Subject: [PATCH 4/5] kv: rip out nodeID from batchClient This state is already in ReplicaDescriptor. Release note: None --- pkg/kv/send_test.go | 36 ++++++++++++++++++------------------ pkg/kv/transport.go | 4 +--- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/pkg/kv/send_test.go b/pkg/kv/send_test.go index 2736fad2229e..3e50d8959486 100644 --- a/pkg/kv/send_test.go +++ b/pkg/kv/send_test.go @@ -205,40 +205,40 @@ func TestSplitHealthy(t *testing.T) { {nil, nil, 0}, { []batchClient{ - {nodeID: 1, healthy: false}, - {nodeID: 2, healthy: false}, - {nodeID: 3, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: false}, + {replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: false}, + {replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true}, }, []batchClient{ - {nodeID: 3, healthy: true}, - {nodeID: 1, healthy: false}, - {nodeID: 2, healthy: false}, + {replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: false}, + {replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: false}, }, 1, }, { []batchClient{ - {nodeID: 1, healthy: true}, - {nodeID: 2, healthy: false}, - {nodeID: 3, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: false}, + {replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true}, }, []batchClient{ - {nodeID: 1, healthy: true}, - {nodeID: 3, healthy: true}, - {nodeID: 2, healthy: false}, + {replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: false}, }, 2, }, { []batchClient{ - {nodeID: 1, healthy: true}, - {nodeID: 2, healthy: true}, - {nodeID: 3, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true}, }, []batchClient{ - {nodeID: 1, healthy: true}, - {nodeID: 2, healthy: true}, - {nodeID: 3, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: true}, + {replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true}, }, 3, }, diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index 26f630fbaa4a..596755caba8e 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -41,7 +41,6 @@ type SendOptions struct { } type batchClient struct { - nodeID roachpb.NodeID replica roachpb.ReplicaDescriptor healthy bool retryable bool @@ -103,7 +102,6 @@ func grpcTransportFactoryImpl( for _, replica := range replicas { healthy := nodeDialer.ConnHealth(replica.NodeID) == nil clients = append(clients, batchClient{ - nodeID: replica.NodeID, replica: replica.ReplicaDescriptor, healthy: healthy, }) @@ -189,7 +187,7 @@ func (gt *grpcTransport) send( gt.opts.metrics.SentCount.Inc(1) var iface roachpb.InternalClient var err error - ctx, iface, err = gt.nodeDialer.DialInternalClient(ctx, client.nodeID) + ctx, iface, err = gt.nodeDialer.DialInternalClient(ctx, client.replica.NodeID) if err != nil { return nil, err } From e9fd30c382d52afc49f813bdc771ebbe0dce5d99 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 20 Aug 2018 15:04:24 -0400 Subject: [PATCH 5/5] kv: rip out Transport.Close This was not needed anymore. This should provide a small but real perf win because we can avoid the context.WithCancel call for every RPC. Release note: None --- pkg/kv/dist_sender.go | 1 - pkg/kv/dist_sender_test.go | 4 ---- pkg/kv/send_test.go | 3 --- pkg/kv/transport.go | 23 ----------------------- pkg/storage/client_test.go | 2 -- 5 files changed, 33 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index f5c83cfee074..350e72afb89f 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -1272,7 +1272,6 @@ func (ds *DistSender) sendToReplicas( if err != nil { return nil, err } - defer transport.Close() if transport.IsExhausted() { return nil, roachpb.NewSendError( fmt.Sprintf("sending to all %d replicas failed", len(replicas))) diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index deb91c554c62..7fd5ceb5e33b 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -166,9 +166,6 @@ func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor { func (*simpleTransportAdapter) MoveToFront(roachpb.ReplicaDescriptor) { } -func (*simpleTransportAdapter) Close() { -} - // TestSendRPCOrder verifies that sendRPC correctly takes into account the // lease holder, attributes and required consistency to determine where to send // remote requests. @@ -2050,7 +2047,6 @@ func TestSenderTransport(t *testing.T) { if !transport.IsExhausted() { t.Fatalf("transport is not exhausted") } - transport.Close() } func TestGatewayNodeID(t *testing.T) { diff --git a/pkg/kv/send_test.go b/pkg/kv/send_test.go index 3e50d8959486..471511073fb7 100644 --- a/pkg/kv/send_test.go +++ b/pkg/kv/send_test.go @@ -115,9 +115,6 @@ func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor { func (*firstNErrorTransport) MoveToFront(roachpb.ReplicaDescriptor) { } -func (*firstNErrorTransport) Close() { -} - // TestComplexScenarios verifies various complex success/failure scenarios by // mocking sendOne. func TestComplexScenarios(t *testing.T) { diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index 596755caba8e..8f35bd088fb0 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -18,7 +18,6 @@ package kv import ( "context" "sort" - "sync" "time" "github.com/opentracing/opentracing-go" @@ -84,10 +83,6 @@ type Transport interface { // already been tried, it will be retried. If the specified replica // can't be found, this is a noop. MoveToFront(roachpb.ReplicaDescriptor) - - // Close is called when the transport is no longer needed. It may - // cancel any pending RPCs without writing any response to the channel. - Close() } // grpcTransportFactoryImpl is the default TransportFactory, using GRPC. @@ -122,8 +117,6 @@ type grpcTransport struct { nodeDialer *nodedialer.Dialer clientIndex int orderedClients []batchClient - closeWG sync.WaitGroup // waits until all SendNext goroutines are done - cancels []func() // called on Close() } // IsExhausted returns false if there are any untried replicas remaining. If @@ -163,12 +156,6 @@ func (gt *grpcTransport) SendNext( client := gt.orderedClients[gt.clientIndex] gt.clientIndex++ - { - var cancel func() - ctx, cancel = context.WithCancel(ctx) - gt.cancels = append(gt.cancels, cancel) - } - ba.Replica = client.replica return gt.send(ctx, client, ba) } @@ -263,13 +250,6 @@ func (gt *grpcTransport) moveToFrontLocked(replica roachpb.ReplicaDescriptor) { } } -func (gt *grpcTransport) Close() { - for _, cancel := range gt.cancels { - cancel() - } - gt.closeWG.Wait() -} - // NB: this method's callers may have a reference to the client they wish to // mutate, but the clients reside in a slice which is shuffled via // MoveToFront, making it unsafe to mutate the client through a reference to @@ -382,6 +362,3 @@ func (s *senderTransport) NextReplica() roachpb.ReplicaDescriptor { func (s *senderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) { } - -func (s *senderTransport) Close() { -} diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 473c265c4248..c36f2e93a267 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -557,8 +557,6 @@ func (t *multiTestContextKVTransport) setPending(repID roachpb.ReplicaID, pendin } } -func (*multiTestContextKVTransport) Close() {} - // rangeDescByAge implements sort.Interface for RangeDescriptor, sorting by the // age of the RangeDescriptor. This is intended to find the most recent version // of the same RangeDescriptor, when multiple versions of it are available.