Skip to content

Commit

Permalink
Merge #69513
Browse files Browse the repository at this point in the history
69513: kvserver: cleanup liveness retry code r=andreimatei a=andreimatei

This patch improves the code around retries of liveness heartbeat.
Before this patch, if a heartbeat was canceled, it might have resulted
in an AmbiguousResultError, which was turned into an explicitly
retriable error, which was causing a retry but the retry was explicitly
short-circuited on canceled ctx. The convoluted logic was
mis-interpreted by me a fellow on first read. The patch also has the

This patch simplifies the logic by making an AmbiguousResultError caused
by a canceled ctx not result in a retryable error in the first place.
As a nice side-effect, this results in the original error propagating to
the Heartbeat() caller, rather than a vanilla "context canceled"
before.

The patch also adds a test about the behavior on this kind of error,
which was a bug in the past but not tested when it was fixed.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Sep 24, 2021
2 parents e36b20b + e9eec09 commit 85b1e92
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 15 deletions.
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ type TestingKnobs struct {
SpanConfig ModuleTestingKnobs
SQLLivenessKnobs ModuleTestingKnobs
TelemetryLoggingKnobs ModuleTestingKnobs
DialerKnobs ModuleTestingKnobs
}
18 changes: 12 additions & 6 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,12 +1222,7 @@ func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) {
func (nl *NodeLiveness) updateLiveness(
ctx context.Context, update livenessUpdate, handleCondFailed func(actual Record) error,
) (Record, error) {
for {
// Before each attempt, ensure that the context has not expired.
if err := ctx.Err(); err != nil {
return Record{}, err
}

for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
nl.mu.RLock()
engines := nl.mu.engines
nl.mu.RUnlock()
Expand All @@ -1250,6 +1245,10 @@ func (nl *NodeLiveness) updateLiveness(
}
return written, nil
}
if err := ctx.Err(); err != nil {
return Record{}, err
}
panic("unreachable; should retry until ctx canceled")
}

func (nl *NodeLiveness) updateLivenessAttempt(
Expand Down Expand Up @@ -1320,6 +1319,13 @@ func (nl *NodeLiveness) updateLivenessAttempt(
}
return Record{}, handleCondFailed(Record{Liveness: actualLiveness, raw: tErr.ActualValue.TagAndDataBytes()})
} else if errors.HasType(err, (*roachpb.AmbiguousResultError)(nil)) {
// We generally want to retry ambiguous errors immediately, except if the
// ctx is canceled - in which case the ambiguous error is probably caused
// by the cancellation (and in any case it's pointless to retry with a
// canceled ctx).
if ctx.Err() != nil {
return Record{}, err
}
return Record{}, &errRetryLiveness{err}
} else if errors.HasType(err, (*roachpb.TransactionStatusError)(nil)) {
// 21.2 nodes can return a TransactionStatusError when they should have
Expand Down
79 changes: 79 additions & 0 deletions pkg/kv/kvserver/node_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kvserver_test

import (
"bytes"
"context"
"reflect"
"sort"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -1051,6 +1053,83 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) {
}
}

// Test that, although a liveness heartbeat is generally retried on
// AmbiguousResultError (see test above), it is not retried when the error is
// caused by a canceled context.
func TestNodeLivenessNoRetryOnAmbiguousResultCausedByCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var sem chan struct{}
testingEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
// Maybe trap a liveness heartbeat.
_, ok := args.Req.(*roachpb.ConditionalPutRequest)
if !ok {
return nil
}
if !bytes.HasPrefix(args.Req.Header().Key, keys.NodeLivenessPrefix) {
return nil
}

if sem == nil {
return nil
}

// Block the request.
sem <- struct{}{}
<-sem
return nil
}
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: testingEvalFilter,
},
},
DialerKnobs: nodedialer.DialerTestingKnobs{
// We're going to cancel a client RPC context and we want that
// cancellation to disconnect the client from the server. That only
// happens when going through gRPC, not when optimizing gRPC away.
TestingNoLocalClientOptimization: true,
},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
nl := s.NodeLiveness().(*liveness.NodeLiveness)

// We want to control the heartbeats.
nl.PauseHeartbeatLoopForTest()

sem = make(chan struct{})

l, ok := nl.Self()
assert.True(t, ok)

hbCtx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
// Wait for a trapped heartbeat.
<-sem
// Cancel the RPC. This should cause the DistSender to return an AmbiguousResultError.
cancel()
}()

err := nl.Heartbeat(hbCtx, l)

// Now that the client has gotten a response, unblock the evaluation on the
// server.
sem <- struct{}{}

// Check that Heartbeat() returned an ambiguous error, and take that as proof
// that the heartbeat wasn't retried.
require.Error(t, err)
require.Equal(t, "result is ambiguous (context done during DistSender.Send: context canceled)", err.Error())
}

func verifyNodeIsDecommissioning(t *testing.T, tc *testcluster.TestCluster, nodeID roachpb.NodeID) {
testutils.SucceedsSoon(t, func() error {
for _, s := range tc.Servers {
Expand Down
49 changes: 41 additions & 8 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,33 @@ type AddressResolver func(roachpb.NodeID) (net.Addr, error)
// it maintains a circuit breaker that prevents rapid connection attempts and
// provides hints to the callers on whether to log the outcome of the operation.
type Dialer struct {
rpcContext *rpc.Context
resolver AddressResolver
rpcContext *rpc.Context
resolver AddressResolver
testingKnobs DialerTestingKnobs

breakers [rpc.NumConnectionClasses]syncutil.IntMap // map[roachpb.NodeID]*wrappedBreaker
}

// DialerOpt contains ocnfiguration options for a Dialer.
type DialerOpt struct {
// TestingKnobs contains testing utilities.
TestingKnobs DialerTestingKnobs
}

// DialerTestingKnobs contains dialer testing options.
type DialerTestingKnobs struct {
// TestingNoLocalClientOptimization, if set, disables the optimization about
// using a direct client for the local node instead of going through gRPC. For
// one, the behavior on cancellation of the client RPC ctx is different: when
// going through gRPC, the framework watches for client ctx cancellation and
// interrupts the RPC. When bypassing gRPC, the client ctx is passed directly
// to the RPC handler.
TestingNoLocalClientOptimization bool
}

// ModuleTestingKnobs implements the ModuleTestingKnobs interface.
func (DialerTestingKnobs) ModuleTestingKnobs() {}

// New initializes a Dialer.
func New(rpcContext *rpc.Context, resolver AddressResolver) *Dialer {
return &Dialer{
Expand All @@ -58,6 +79,13 @@ func New(rpcContext *rpc.Context, resolver AddressResolver) *Dialer {
}
}

// NewWithOpt initializes a Dialer and allows passing in configuration options.
func NewWithOpt(rpcContext *rpc.Context, resolver AddressResolver, opt DialerOpt) *Dialer {
d := New(rpcContext, resolver)
d.testingKnobs = opt.TestingKnobs
return d
}

// Stopper returns this node dialer's Stopper.
// TODO(bdarnell): This is a bit of a hack for kv/transport_race.go
func (n *Dialer) Stopper() *stop.Stopper {
Expand Down Expand Up @@ -124,14 +152,19 @@ func (n *Dialer) DialInternalClient(
if err != nil {
return nil, nil, err
}
if localClient := n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID); localClient != nil {
log.VEvent(ctx, 2, kvbase.RoutingRequestLocallyMsg)

// Create a new context from the existing one with the "local request" field set.
// This tells the handler that this is an in-process request, bypassing ctx.Peer checks.
localCtx := grpcutil.NewLocalRequestContext(ctx)
{
// If we're dialing the local node, don't go through gRPC.
localClient := n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID)
if localClient != nil && !n.testingKnobs.TestingNoLocalClientOptimization {
log.VEvent(ctx, 2, kvbase.RoutingRequestLocallyMsg)

return localCtx, localClient, nil
// Create a new context from the existing one with the "local request" field set.
// This tells the handler that this is an in-process request, bypassing ctx.Peer checks.
localCtx := grpcutil.NewLocalRequestContext(ctx)

return localCtx, localClient, nil
}
}
log.VEventf(ctx, 2, "sending request to %s", addr)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), true /* checkBreaker */, class)
Expand Down
9 changes: 8 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
cfg.Locality,
&cfg.DefaultZoneConfig,
)
nodeDialer := nodedialer.New(rpcContext, gossip.AddressResolver(g))

var dialerKnobs nodedialer.DialerTestingKnobs
if dk := cfg.TestingKnobs.DialerKnobs; dk != nil {
dialerKnobs = dk.(nodedialer.DialerTestingKnobs)
}

nodeDialer := nodedialer.NewWithOpt(rpcContext, gossip.AddressResolver(g),
nodedialer.DialerOpt{TestingKnobs: dialerKnobs})

runtimeSampler := status.NewRuntimeStatSampler(ctx, clock)
registry.AddMetricStruct(runtimeSampler)
Expand Down

0 comments on commit 85b1e92

Please sign in to comment.