diff --git a/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel b/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel index 8788cff8d3fe..4a88290c13e4 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel +++ b/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel @@ -44,6 +44,7 @@ go_test( "//pkg/roachpb:with-mocks", "//pkg/rpc", "//pkg/settings/cluster", + "//pkg/testutils", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender.go b/pkg/kv/kvserver/closedts/sidetransport/sender.go index 00536eb51a45..3b30c582e995 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender.go @@ -107,7 +107,8 @@ type streamState struct { } type connTestingKnobs struct { - beforeSend func(destNodeID roachpb.NodeID, msg *ctpb.Update) + beforeSend func(destNodeID roachpb.NodeID, msg *ctpb.Update) + sleepOnErrOverride time.Duration } // trackedRange contains the information that the side-transport last published @@ -666,6 +667,10 @@ type nodeDialer interface { Dial(ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass) (_ *grpc.ClientConn, err error) } +// On sending errors, we sleep a bit as to not spin on a tripped +// circuit-breaker in the Dialer. +const sleepOnErr = time.Second + // rpcConn is an implementation of conn that is implemented using a gRPC stream. // // The connection will read messages from producer.buf. If the buffer overflows @@ -773,18 +778,23 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { defer r.cleanupStream(nil /* err */) everyN := log.Every(10 * time.Second) - // On sending errors, we sleep a bit as to not spin on a tripped - // circuit-breaker in the Dialer. - const sleepOnErr = time.Second + errSleepTime := sleepOnErr + if r.testingKnobs.sleepOnErrOverride > 0 { + errSleepTime = r.testingKnobs.sleepOnErrOverride + } + for { if ctx.Err() != nil { return } + if atomic.LoadInt32(&r.closed) > 0 { + return + } if err := r.maybeConnect(ctx, stopper); err != nil { if everyN.ShouldLog() { log.Infof(ctx, "side-transport failed to connect to n%d: %s", r.nodeID, err) } - time.Sleep(sleepOnErr) + time.Sleep(errSleepTime) continue } @@ -797,10 +807,6 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { if !ok { return } - closed := atomic.LoadInt32(&r.closed) > 0 - if closed { - return - } if msg == nil { // The sequence number we've requested is no longer in the buffer. We @@ -827,7 +833,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { // should have a blocking version of Dial() that we just leave hanging // and get a notification when it succeeds. r.cleanupStream(err) - time.Sleep(sleepOnErr) + time.Sleep(errSleepTime) } } }) diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go index cceae4ae2fe2..4cf7a2c22abb 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "net" + "sync/atomic" "testing" "time" @@ -23,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -547,3 +549,55 @@ func TestSenderReceiverIntegration(t *testing.T) { // Check that the other Receiver is still receiving updates. <-receivers[2].testingKnobs[1].onMsg } + +type failingDialer struct { + dialCount int32 +} + +var _ nodeDialer = &failingDialer{} + +func (f *failingDialer) Dial( + ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass, +) (_ *grpc.ClientConn, err error) { + atomic.AddInt32(&f.dialCount, 1) + return nil, errors.New("failingDialer") +} + +func (f *failingDialer) callCount() int32 { + return atomic.LoadInt32(&f.dialCount) +} + +// TestRPCConnStopOnClose verifies that connections that are closed would stop +// their work loops eagerly even when nodes they are talking to are unreachable. +func TestRPCConnStopOnClose(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + sleepTime := time.Millisecond + + dialer := &failingDialer{} + factory := newRPCConnFactory(dialer, connTestingKnobs{sleepOnErrOverride: sleepTime}) + connection := factory.new(nil, /* sender is not needed as dialer always fails Dial attempts */ + roachpb.NodeID(1)) + connection.run(ctx, stopper) + + // Wait for first dial attempt for sanity reasons. + testutils.SucceedsSoon(t, func() error { + if dialer.callCount() == 0 { + return errors.New("connection didn't dial yet") + } + return nil + }) + connection.close() + // Ensure that dialing stops once connection is stopped. + testutils.SucceedsSoon(t, func() error { + if stopper.NumTasks() > 0 { + return errors.New("connection worker didn't stop yet") + } + return nil + }) +}