diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 85b2fc4e762c..779b7601b89b 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -44,4 +44,5 @@ type TestingKnobs struct { SpanConfig ModuleTestingKnobs SQLLivenessKnobs ModuleTestingKnobs TelemetryLoggingKnobs ModuleTestingKnobs + DialerKnobs ModuleTestingKnobs } diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 65f5f9b539b2..e193bca99565 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -1222,12 +1222,7 @@ func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { func (nl *NodeLiveness) updateLiveness( ctx context.Context, update livenessUpdate, handleCondFailed func(actual Record) error, ) (Record, error) { - for { - // Before each attempt, ensure that the context has not expired. - if err := ctx.Err(); err != nil { - return Record{}, err - } - + for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { nl.mu.RLock() engines := nl.mu.engines nl.mu.RUnlock() @@ -1250,6 +1245,10 @@ func (nl *NodeLiveness) updateLiveness( } return written, nil } + if err := ctx.Err(); err != nil { + return Record{}, err + } + panic("unreachable; should retry until ctx canceled") } func (nl *NodeLiveness) updateLivenessAttempt( @@ -1320,6 +1319,13 @@ func (nl *NodeLiveness) updateLivenessAttempt( } return Record{}, handleCondFailed(Record{Liveness: actualLiveness, raw: tErr.ActualValue.TagAndDataBytes()}) } else if errors.HasType(err, (*roachpb.AmbiguousResultError)(nil)) { + // We generally want to retry ambiguous errors immediately, except if the + // ctx is canceled - in which case the ambiguous error is probably caused + // by the cancellation (and in any case it's pointless to retry with a + // canceled ctx). + if ctx.Err() != nil { + return Record{}, err + } return Record{}, &errRetryLiveness{err} } else if errors.HasType(err, (*roachpb.TransactionStatusError)(nil)) { // 21.2 nodes can return a TransactionStatusError when they should have diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 174dc7ecba3c..ba149a471fc6 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -11,6 +11,7 @@ package kvserver_test import ( + "bytes" "context" "reflect" "sort" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -1051,6 +1053,83 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) { } } +// Test that, although a liveness heartbeat is generally retried on +// AmbiguousResultError (see test above), it is not retried when the error is +// caused by a canceled context. +func TestNodeLivenessNoRetryOnAmbiguousResultCausedByCancellation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var sem chan struct{} + testingEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { + // Maybe trap a liveness heartbeat. + _, ok := args.Req.(*roachpb.ConditionalPutRequest) + if !ok { + return nil + } + if !bytes.HasPrefix(args.Req.Header().Key, keys.NodeLivenessPrefix) { + return nil + } + + if sem == nil { + return nil + } + + // Block the request. + sem <- struct{}{} + <-sem + return nil + } + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: testingEvalFilter, + }, + }, + DialerKnobs: nodedialer.DialerTestingKnobs{ + // We're going to cancel a client RPC context and we want that + // cancellation to disconnect the client from the server. That only + // happens when going through gRPC, not when optimizing gRPC away. + TestingNoLocalClientOptimization: true, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + nl := s.NodeLiveness().(*liveness.NodeLiveness) + + // We want to control the heartbeats. + nl.PauseHeartbeatLoopForTest() + + sem = make(chan struct{}) + + l, ok := nl.Self() + assert.True(t, ok) + + hbCtx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + // Wait for a trapped heartbeat. + <-sem + // Cancel the RPC. This should cause the DistSender to return an AmbiguousResultError. + cancel() + }() + + err := nl.Heartbeat(hbCtx, l) + + // Now that the client has gotten a response, unblock the evaluation on the + // server. + sem <- struct{}{} + + // Check that Heartbeat() returned an ambiguous error, and take that as proof + // that the heartbeat wasn't retried. + require.Error(t, err) + require.Equal(t, "result is ambiguous (context done during DistSender.Send: context canceled)", err.Error()) +} + func verifyNodeIsDecommissioning(t *testing.T, tc *testcluster.TestCluster, nodeID roachpb.NodeID) { testutils.SucceedsSoon(t, func() error { for _, s := range tc.Servers { diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index ef772eae036f..4d782ececc90 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -44,12 +44,33 @@ type AddressResolver func(roachpb.NodeID) (net.Addr, error) // it maintains a circuit breaker that prevents rapid connection attempts and // provides hints to the callers on whether to log the outcome of the operation. type Dialer struct { - rpcContext *rpc.Context - resolver AddressResolver + rpcContext *rpc.Context + resolver AddressResolver + testingKnobs DialerTestingKnobs breakers [rpc.NumConnectionClasses]syncutil.IntMap // map[roachpb.NodeID]*wrappedBreaker } +// DialerOpt contains ocnfiguration options for a Dialer. +type DialerOpt struct { + // TestingKnobs contains testing utilities. + TestingKnobs DialerTestingKnobs +} + +// DialerTestingKnobs contains dialer testing options. +type DialerTestingKnobs struct { + // TestingNoLocalClientOptimization, if set, disables the optimization about + // using a direct client for the local node instead of going through gRPC. For + // one, the behavior on cancellation of the client RPC ctx is different: when + // going through gRPC, the framework watches for client ctx cancellation and + // interrupts the RPC. When bypassing gRPC, the client ctx is passed directly + // to the RPC handler. + TestingNoLocalClientOptimization bool +} + +// ModuleTestingKnobs implements the ModuleTestingKnobs interface. +func (DialerTestingKnobs) ModuleTestingKnobs() {} + // New initializes a Dialer. func New(rpcContext *rpc.Context, resolver AddressResolver) *Dialer { return &Dialer{ @@ -58,6 +79,13 @@ func New(rpcContext *rpc.Context, resolver AddressResolver) *Dialer { } } +// NewWithOpt initializes a Dialer and allows passing in configuration options. +func NewWithOpt(rpcContext *rpc.Context, resolver AddressResolver, opt DialerOpt) *Dialer { + d := New(rpcContext, resolver) + d.testingKnobs = opt.TestingKnobs + return d +} + // Stopper returns this node dialer's Stopper. // TODO(bdarnell): This is a bit of a hack for kv/transport_race.go func (n *Dialer) Stopper() *stop.Stopper { @@ -124,14 +152,19 @@ func (n *Dialer) DialInternalClient( if err != nil { return nil, nil, err } - if localClient := n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID); localClient != nil { - log.VEvent(ctx, 2, kvbase.RoutingRequestLocallyMsg) - // Create a new context from the existing one with the "local request" field set. - // This tells the handler that this is an in-process request, bypassing ctx.Peer checks. - localCtx := grpcutil.NewLocalRequestContext(ctx) + { + // If we're dialing the local node, don't go through gRPC. + localClient := n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID) + if localClient != nil && !n.testingKnobs.TestingNoLocalClientOptimization { + log.VEvent(ctx, 2, kvbase.RoutingRequestLocallyMsg) - return localCtx, localClient, nil + // Create a new context from the existing one with the "local request" field set. + // This tells the handler that this is an in-process request, bypassing ctx.Peer checks. + localCtx := grpcutil.NewLocalRequestContext(ctx) + + return localCtx, localClient, nil + } } log.VEventf(ctx, 2, "sending request to %s", addr) conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), true /* checkBreaker */, class) diff --git a/pkg/server/server.go b/pkg/server/server.go index 9cac4e7f8030..7ec0f1dc2ad9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -388,7 +388,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { cfg.Locality, &cfg.DefaultZoneConfig, ) - nodeDialer := nodedialer.New(rpcContext, gossip.AddressResolver(g)) + + var dialerKnobs nodedialer.DialerTestingKnobs + if dk := cfg.TestingKnobs.DialerKnobs; dk != nil { + dialerKnobs = dk.(nodedialer.DialerTestingKnobs) + } + + nodeDialer := nodedialer.NewWithOpt(rpcContext, gossip.AddressResolver(g), + nodedialer.DialerOpt{TestingKnobs: dialerKnobs}) runtimeSampler := status.NewRuntimeStatSampler(ctx, clock) registry.AddMetricStruct(runtimeSampler)