Skip to content

Commit

Permalink
Merge pull request #89175 from knz/backport21.2-80164
Browse files Browse the repository at this point in the history
  • Loading branch information
knz authored Oct 4, 2022
2 parents fbd1f00 + 69f5451 commit ef8227d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_test(
"//pkg/roachpb:with-mocks",
"//pkg/rpc",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
26 changes: 16 additions & 10 deletions pkg/kv/kvserver/closedts/sidetransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ type streamState struct {
}

type connTestingKnobs struct {
beforeSend func(destNodeID roachpb.NodeID, msg *ctpb.Update)
beforeSend func(destNodeID roachpb.NodeID, msg *ctpb.Update)
sleepOnErrOverride time.Duration
}

// trackedRange contains the information that the side-transport last published
Expand Down Expand Up @@ -666,6 +667,10 @@ type nodeDialer interface {
Dial(ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass) (_ *grpc.ClientConn, err error)
}

// On sending errors, we sleep a bit as to not spin on a tripped
// circuit-breaker in the Dialer.
const sleepOnErr = time.Second

// rpcConn is an implementation of conn that is implemented using a gRPC stream.
//
// The connection will read messages from producer.buf. If the buffer overflows
Expand Down Expand Up @@ -773,18 +778,23 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
defer r.cleanupStream(nil /* err */)
everyN := log.Every(10 * time.Second)

// On sending errors, we sleep a bit as to not spin on a tripped
// circuit-breaker in the Dialer.
const sleepOnErr = time.Second
errSleepTime := sleepOnErr
if r.testingKnobs.sleepOnErrOverride > 0 {
errSleepTime = r.testingKnobs.sleepOnErrOverride
}

for {
if ctx.Err() != nil {
return
}
if atomic.LoadInt32(&r.closed) > 0 {
return
}
if err := r.maybeConnect(ctx, stopper); err != nil {
if everyN.ShouldLog() {
log.Infof(ctx, "side-transport failed to connect to n%d: %s", r.nodeID, err)
}
time.Sleep(sleepOnErr)
time.Sleep(errSleepTime)
continue
}

Expand All @@ -797,10 +807,6 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
if !ok {
return
}
closed := atomic.LoadInt32(&r.closed) > 0
if closed {
return
}

if msg == nil {
// The sequence number we've requested is no longer in the buffer. We
Expand All @@ -827,7 +833,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
// should have a blocking version of Dial() that we just leave hanging
// and get a notification when it succeeds.
r.cleanupStream(err)
time.Sleep(sleepOnErr)
time.Sleep(errSleepTime)
}
}
})
Expand Down
54 changes: 54 additions & 0 deletions pkg/kv/kvserver/closedts/sidetransport/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"net"
"sync/atomic"
"testing"
"time"

Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -547,3 +549,55 @@ func TestSenderReceiverIntegration(t *testing.T) {
// Check that the other Receiver is still receiving updates.
<-receivers[2].testingKnobs[1].onMsg
}

type failingDialer struct {
dialCount int32
}

var _ nodeDialer = &failingDialer{}

func (f *failingDialer) Dial(
ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
atomic.AddInt32(&f.dialCount, 1)
return nil, errors.New("failingDialer")
}

func (f *failingDialer) callCount() int32 {
return atomic.LoadInt32(&f.dialCount)
}

// TestRPCConnStopOnClose verifies that connections that are closed would stop
// their work loops eagerly even when nodes they are talking to are unreachable.
func TestRPCConnStopOnClose(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

sleepTime := time.Millisecond

dialer := &failingDialer{}
factory := newRPCConnFactory(dialer, connTestingKnobs{sleepOnErrOverride: sleepTime})
connection := factory.new(nil, /* sender is not needed as dialer always fails Dial attempts */
roachpb.NodeID(1))
connection.run(ctx, stopper)

// Wait for first dial attempt for sanity reasons.
testutils.SucceedsSoon(t, func() error {
if dialer.callCount() == 0 {
return errors.New("connection didn't dial yet")
}
return nil
})
connection.close()
// Ensure that dialing stops once connection is stopped.
testutils.SucceedsSoon(t, func() error {
if stopper.NumTasks() > 0 {
return errors.New("connection worker didn't stop yet")
}
return nil
})
}

0 comments on commit ef8227d

Please sign in to comment.