From 227c3e7272ba7dd4f407c4e2f9499834e7246df0 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Tue, 19 Apr 2022 15:05:30 +0100 Subject: [PATCH] closedts: stop workloop on close when remote is unavailable Previously if remote node containing replica was removed, sidetransport connection will keep trying to connect to non-existing node until server is stopped regardless of connection status. This is not good as it will keep trying every second wasting resources and also spamming log with message every 10 seconds. This patch moves liveness check to the top of runloop to terminate it early as soon as connection is closed. Release note: None --- .../closedts/sidetransport/BUILD.bazel | 1 + .../kvserver/closedts/sidetransport/sender.go | 26 +++++---- .../closedts/sidetransport/sender_test.go | 54 +++++++++++++++++++ 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel b/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel index cabff419388b..b619b3d13836 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel +++ b/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel @@ -44,6 +44,7 @@ go_test( "//pkg/roachpb", "//pkg/rpc", "//pkg/settings/cluster", + "//pkg/testutils", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender.go b/pkg/kv/kvserver/closedts/sidetransport/sender.go index 00536eb51a45..3b30c582e995 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender.go @@ -107,7 +107,8 @@ type streamState struct { } type connTestingKnobs struct { - beforeSend func(destNodeID roachpb.NodeID, msg *ctpb.Update) + beforeSend func(destNodeID roachpb.NodeID, msg *ctpb.Update) + sleepOnErrOverride time.Duration } // trackedRange contains the information that the side-transport last published @@ -666,6 +667,10 @@ type nodeDialer interface { Dial(ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass) (_ *grpc.ClientConn, err error) } +// On sending errors, we sleep a bit as to not spin on a tripped +// circuit-breaker in the Dialer. +const sleepOnErr = time.Second + // rpcConn is an implementation of conn that is implemented using a gRPC stream. // // The connection will read messages from producer.buf. If the buffer overflows @@ -773,18 +778,23 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { defer r.cleanupStream(nil /* err */) everyN := log.Every(10 * time.Second) - // On sending errors, we sleep a bit as to not spin on a tripped - // circuit-breaker in the Dialer. - const sleepOnErr = time.Second + errSleepTime := sleepOnErr + if r.testingKnobs.sleepOnErrOverride > 0 { + errSleepTime = r.testingKnobs.sleepOnErrOverride + } + for { if ctx.Err() != nil { return } + if atomic.LoadInt32(&r.closed) > 0 { + return + } if err := r.maybeConnect(ctx, stopper); err != nil { if everyN.ShouldLog() { log.Infof(ctx, "side-transport failed to connect to n%d: %s", r.nodeID, err) } - time.Sleep(sleepOnErr) + time.Sleep(errSleepTime) continue } @@ -797,10 +807,6 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { if !ok { return } - closed := atomic.LoadInt32(&r.closed) > 0 - if closed { - return - } if msg == nil { // The sequence number we've requested is no longer in the buffer. We @@ -827,7 +833,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { // should have a blocking version of Dial() that we just leave hanging // and get a notification when it succeeds. r.cleanupStream(err) - time.Sleep(sleepOnErr) + time.Sleep(errSleepTime) } } }) diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go index 84205de9487a..e96992a21e1b 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "net" + "sync/atomic" "testing" "time" @@ -23,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -555,3 +557,55 @@ func TestSenderReceiverIntegration(t *testing.T) { // Check that the other Receiver is still receiving updates. <-receivers[2].testingKnobs[1].onMsg } + +type failingDialer struct { + dialCount int32 +} + +var _ nodeDialer = &failingDialer{} + +func (f *failingDialer) Dial( + ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass, +) (_ *grpc.ClientConn, err error) { + atomic.AddInt32(&f.dialCount, 1) + return nil, errors.New("failingDialer") +} + +func (f *failingDialer) callCount() int32 { + return atomic.LoadInt32(&f.dialCount) +} + +// TestRPCConnStopOnClose verifies that connections that are closed would stop +// their work loops eagerly even when nodes they are talking to are unreachable. +func TestRPCConnStopOnClose(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + sleepTime := time.Millisecond + + dialer := &failingDialer{} + factory := newRPCConnFactory(dialer, connTestingKnobs{sleepOnErrOverride: sleepTime}) + connection := factory.new(nil, /* sender is not needed as dialer always fails Dial attempts */ + roachpb.NodeID(1)) + connection.run(ctx, stopper) + + // Wait for first dial attempt for sanity reasons. + testutils.SucceedsSoon(t, func() error { + if dialer.callCount() == 0 { + return errors.New("connection didn't dial yet") + } + return nil + }) + connection.close() + // Ensure that dialing stops once connection is stopped. + testutils.SucceedsSoon(t, func() error { + if stopper.NumTasks() > 0 { + return errors.New("connection worker didn't stop yet") + } + return nil + }) +}