Skip to content

Commit

Permalink
Merge #28855
Browse files Browse the repository at this point in the history
28855: kv: give Transport a haircut r=nvanbenschoten a=nvanbenschoten

This includes 5 refactors to `kv.Transport` that will make it a lot easier to extend for RangeFeed changes. They should also provide modest perf benefits due to reduced locking and allocation avoidance. The changes include:
- adding `BatchRequest` to `Transport.SendNext` instead of capturing in `Transport`
- ripping out `Transport.GetPending` and `batchClient.pending`
- ripping out `clientPendingMu` from `grpcTransport`
- removing `nodeID` from `batchClient`
- ripping out `Transport.Close`

No need to review yet. Pushing for CI.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 21, 2018
2 parents 2ef6ef2 + e9fd30c commit e62da97
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 214 deletions.
13 changes: 6 additions & 7 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,30 +1257,29 @@ func (ds *DistSender) sendToReplicas(
opts SendOptions,
rangeID roachpb.RangeID,
replicas ReplicaSlice,
args roachpb.BatchRequest,
ba roachpb.BatchRequest,
nodeDialer *nodedialer.Dialer,
) (*roachpb.BatchResponse, error) {
var ambiguousError error
var haveCommit bool
// We only check for committed txns, not aborts because aborts may
// be retried without any risk of inconsistencies.
if etArg, ok := args.GetArg(roachpb.EndTransaction); ok {
if etArg, ok := ba.GetArg(roachpb.EndTransaction); ok {
haveCommit = etArg.(*roachpb.EndTransactionRequest).Commit
}

transport, err := ds.transportFactory(opts, nodeDialer, replicas, args)
transport, err := ds.transportFactory(opts, nodeDialer, replicas)
if err != nil {
return nil, err
}
defer transport.Close()
if transport.IsExhausted() {
return nil, roachpb.NewSendError(
fmt.Sprintf("sending to all %d replicas failed", len(replicas)))
}

curReplica := transport.NextReplica()
log.VEventf(ctx, 2, "r%d: sending batch %s to %s", rangeID, args.Summary(), curReplica)
br, err := transport.SendNext(ctx)
log.VEventf(ctx, 2, "r%d: sending batch %s to %s", rangeID, ba.Summary(), curReplica)
br, err := transport.SendNext(ctx, ba)

// This loop will retry operations that fail with errors that reflect
// per-replica state and may succeed on other replicas.
Expand Down Expand Up @@ -1394,6 +1393,6 @@ func (ds *DistSender) sendToReplicas(
ds.metrics.NextReplicaErrCount.Inc(1)
curReplica = transport.NextReplica()
log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, curReplica)
br, err = transport.SendNext(ctx)
br, err = transport.SendNext(ctx, ba)
}
}
25 changes: 8 additions & 17 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,36 +129,31 @@ func adaptSimpleTransport(fn simpleSendFn) TransportFactory {
opts SendOptions,
nodeDialer *nodedialer.Dialer,
replicas ReplicaSlice,
args roachpb.BatchRequest,
) (Transport, error) {
return &simpleTransportAdapter{
fn: fn,
opts: opts,
replicas: replicas,
args: args}, nil
replicas: replicas}, nil
}
}

type simpleTransportAdapter struct {
fn simpleSendFn
opts SendOptions
replicas ReplicaSlice
args roachpb.BatchRequest
nextReplica int
}

func (l *simpleTransportAdapter) IsExhausted() bool {
return l.nextReplica >= len(l.replicas)
}

func (l *simpleTransportAdapter) GetPending() []roachpb.ReplicaDescriptor {
return nil
}

func (l *simpleTransportAdapter) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) {
l.args.Replica = l.replicas[l.nextReplica].ReplicaDescriptor
func (l *simpleTransportAdapter) SendNext(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
ba.Replica = l.replicas[l.nextReplica].ReplicaDescriptor
l.nextReplica++
return l.fn(ctx, l.opts, l.replicas, l.args)
return l.fn(ctx, l.opts, l.replicas, ba)
}

func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor {
Expand All @@ -171,9 +166,6 @@ func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor {
func (*simpleTransportAdapter) MoveToFront(roachpb.ReplicaDescriptor) {
}

func (*simpleTransportAdapter) Close() {
}

// TestSendRPCOrder verifies that sendRPC correctly takes into account the
// lease holder, attributes and required consistency to determine where to send
// remote requests.
Expand Down Expand Up @@ -2044,18 +2036,17 @@ func TestSenderTransport(t *testing.T) {
) (r *roachpb.BatchResponse, e *roachpb.Error) {
return
},
))(SendOptions{}, &nodedialer.Dialer{}, ReplicaSlice{{}}, roachpb.BatchRequest{})
))(SendOptions{}, &nodedialer.Dialer{}, ReplicaSlice{{}})
if err != nil {
t.Fatal(err)
}
_, err = transport.SendNext(context.Background())
_, err = transport.SendNext(context.Background(), roachpb.BatchRequest{})
if err != nil {
t.Fatal(err)
}
if !transport.IsExhausted() {
t.Fatalf("transport is not exhausted")
}
transport.Close()
}

func TestGatewayNodeID(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/local_test_cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ type localTestClusterTransport struct {
latency time.Duration
}

func (l *localTestClusterTransport) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) {
func (l *localTestClusterTransport) SendNext(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
if l.latency > 0 {
time.Sleep(l.latency)
}
return l.Transport.SendNext(ctx)
return l.Transport.SendNext(ctx, ba)
}

// InitFactoryForLocalTestCluster initializes a TxnCoordSenderFactory
Expand Down Expand Up @@ -94,9 +96,8 @@ func NewDistSenderForLocalTestCluster(
opts SendOptions,
nodeDialer *nodedialer.Dialer,
replicas ReplicaSlice,
args roachpb.BatchRequest,
) (Transport, error) {
transport, err := senderTransportFactory(opts, nodeDialer, replicas, args)
transport, err := senderTransportFactory(opts, nodeDialer, replicas)
if err != nil {
return nil, err
}
Expand Down
50 changes: 21 additions & 29 deletions pkg/kv/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func TestSendToOneClient(t *testing.T) {
// requests to the first N addresses, then succeeds.
type firstNErrorTransport struct {
replicas ReplicaSlice
args roachpb.BatchRequest
numErrors int
numSent int
}
Expand All @@ -98,11 +97,9 @@ func (f *firstNErrorTransport) IsExhausted() bool {
return f.numSent >= len(f.replicas)
}

func (f *firstNErrorTransport) GetPending() []roachpb.ReplicaDescriptor {
return nil
}

func (f *firstNErrorTransport) SendNext(_ context.Context) (*roachpb.BatchResponse, error) {
func (f *firstNErrorTransport) SendNext(
_ context.Context, _ roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
var err error
if f.numSent < f.numErrors {
err = roachpb.NewSendError("test")
Expand All @@ -118,9 +115,6 @@ func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor {
func (*firstNErrorTransport) MoveToFront(roachpb.ReplicaDescriptor) {
}

func (*firstNErrorTransport) Close() {
}

// TestComplexScenarios verifies various complex success/failure scenarios by
// mocking sendOne.
func TestComplexScenarios(t *testing.T) {
Expand Down Expand Up @@ -171,11 +165,9 @@ func TestComplexScenarios(t *testing.T) {
_ SendOptions,
_ *nodedialer.Dialer,
replicas ReplicaSlice,
args roachpb.BatchRequest,
) (Transport, error) {
return &firstNErrorTransport{
replicas: replicas,
args: args,
numErrors: test.numErrors,
}, nil
},
Expand Down Expand Up @@ -210,40 +202,40 @@ func TestSplitHealthy(t *testing.T) {
{nil, nil, 0},
{
[]batchClient{
{nodeID: 1, healthy: false},
{nodeID: 2, healthy: false},
{nodeID: 3, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: false},
{replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: false},
{replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true},
},
[]batchClient{
{nodeID: 3, healthy: true},
{nodeID: 1, healthy: false},
{nodeID: 2, healthy: false},
{replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: false},
{replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: false},
},
1,
},
{
[]batchClient{
{nodeID: 1, healthy: true},
{nodeID: 2, healthy: false},
{nodeID: 3, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: false},
{replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true},
},
[]batchClient{
{nodeID: 1, healthy: true},
{nodeID: 3, healthy: true},
{nodeID: 2, healthy: false},
{replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: false},
},
2,
},
{
[]batchClient{
{nodeID: 1, healthy: true},
{nodeID: 2, healthy: true},
{nodeID: 3, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true},
},
[]batchClient{
{nodeID: 1, healthy: true},
{nodeID: 2, healthy: true},
{nodeID: 3, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 1}, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 2}, healthy: true},
{replica: roachpb.ReplicaDescriptor{NodeID: 3}, healthy: true},
},
3,
},
Expand Down
Loading

0 comments on commit e62da97

Please sign in to comment.