Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc/nodedialer: avoid tripping breaker on context errors #35433

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const (
// leader lease active duration should be of the raft election timeout.
defaultRangeLeaseRaftElectionTimeoutMultiplier = 3

// defaultHeartbeatInterval is the default value of HeartbeatInterval used
// by the rpc context.
defaultHeartbeatInterval = 3 * time.Second

// rangeLeaseRenewalFraction specifies what fraction the range lease
// renewal duration should be of the range lease active time. For example,
// with a value of 0.2 and a lease duration of 10 seconds, leases would be
Expand Down Expand Up @@ -178,6 +182,11 @@ type Config struct {
// it is set to the arbitrary length of six times the Metrics sample interval.
// See the comment in server.Config for more details.
HistogramWindowInterval time.Duration

// HeartbeatInterval controls how often a Ping request is sent on peer
// connections to determine connection health and update the local view
// of remote clocks.
HeartbeatInterval time.Duration
}

func wrapError(err error) error {
Expand All @@ -200,6 +209,7 @@ func (cfg *Config) InitDefaults() {
cfg.HTTPAddr = defaultHTTPAddr
cfg.SSLCertsDir = DefaultCertsDirectory
cfg.certificateManager = lazyCertificateManager{}
cfg.HeartbeatInterval = defaultHeartbeatInterval
}

// HTTPRequestScheme returns "http" or "https" based on the value of Insecure.
Expand Down
7 changes: 3 additions & 4 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func init() {
}

const (
defaultHeartbeatInterval = 3 * time.Second
// The coefficient by which the maximum offset is multiplied to determine the
// maximum acceptable measurement latency.
maximumPingDurationMult = 2
Expand Down Expand Up @@ -404,10 +403,10 @@ func NewContext(
var cancel context.CancelFunc
ctx.masterCtx, cancel = context.WithCancel(ambient.AnnotateCtx(context.Background()))
ctx.Stopper = stopper
ctx.heartbeatInterval = baseCtx.HeartbeatInterval
ctx.RemoteClocks = newRemoteClockMonitor(
ctx.LocalClock, 10*defaultHeartbeatInterval, baseCtx.HistogramWindowInterval)
ctx.heartbeatInterval = defaultHeartbeatInterval
ctx.heartbeatTimeout = 2 * defaultHeartbeatInterval
ctx.LocalClock, 10*ctx.heartbeatInterval, baseCtx.HistogramWindowInterval)
ctx.heartbeatTimeout = 2 * ctx.heartbeatInterval

stopper.RunWorker(ctx.masterCtx, func(context.Context) {
<-stopper.ShouldQuiesce()
Expand Down
67 changes: 35 additions & 32 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,33 +83,13 @@ func (n *Dialer) Dial(ctx context.Context, nodeID roachpb.NodeID) (_ *grpc.Clien
return nil, ctxErr
}
breaker := n.getBreaker(nodeID)

if !breaker.Ready() {
err := errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID)
return nil, err
}

defer func() {
// Enforce a minimum interval between warnings for failed connections.
if err != nil && breaker.ShouldLog() {
log.Infof(ctx, "unable to connect to n%d: %s", nodeID, err)
}
}()

addr, err := n.resolver(nodeID)
if err != nil {
err = errors.Wrapf(err, "failed to resolve n%d", nodeID)
breaker.Fail(err)
return nil, err
}
conn, err := n.rpcContext.GRPCDial(addr.String()).Connect(ctx)
if err != nil {
err = errors.Wrapf(err, "failed to grpc dial n%d at %v", nodeID, addr)
breaker.Fail(err)
return nil, err
}
breaker.Success()
return conn, nil
return n.dial(ctx, nodeID, addr, breaker)
}

// DialInternalClient is a specialization of Dial for callers that
Expand All @@ -124,10 +104,6 @@ func (n *Dialer) DialInternalClient(
if n == nil || n.resolver == nil {
return nil, nil, errors.New("no node dialer configured")
}
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, nil, ctxErr
}
addr, err := n.resolver(nodeID)
if err != nil {
return nil, nil, err
Expand All @@ -141,32 +117,59 @@ func (n *Dialer) DialInternalClient(

return localCtx, localClient, nil
}

breaker := n.getBreaker(nodeID)

log.VEventf(ctx, 2, "sending request to %s", addr)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID))
if err != nil {
return nil, nil, err
}
return ctx, roachpb.NewInternalClient(conn), err
}

// dial performs the dialing of the remove connection.
func (n *Dialer) dial(
ctx context.Context, nodeID roachpb.NodeID, addr net.Addr, breaker *wrappedBreaker,
) (_ *grpc.ClientConn, err error) {
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
}
if !breaker.Ready() {
err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID)
return nil, err
}
defer func() {
// Enforce a minimum interval between warnings for failed connections.
if err != nil && ctx.Err() == nil && breaker.ShouldLog() {
log.Infof(ctx, "unable to connect to n%d: %s", nodeID, err)
}
}()
conn, err := n.rpcContext.GRPCDial(addr.String()).Connect(ctx)
if err != nil {
// If we were canceled during the dial, don't trip the breaker.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
}
err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr)
breaker.Fail(err)
return nil, nil, err
return nil, err
}
// Check to see if the connection is in the transient failure state. This can
// happen if the connection already existed, but a recent heartbeat has
// failed and we haven't yet torn down the connection.
if err := grpcutil.ConnectionReady(conn); err != nil {
err = errors.Wrapf(err, "failed to check for connection ready to n%d at %v", nodeID, addr)
err = errors.Wrapf(err, "failed to check for ready connection to n%d at %v", nodeID, addr)
breaker.Fail(err)
return nil, nil, err
return nil, err
}

// TODO(bdarnell): Reconcile the different health checks and circuit breaker
// behavior in this file. Note that this different behavior causes problems
// for higher-levels in the system. For example, DistSQL checks for
// ConnHealth when scheduling processors, but can then see attempts to send
// RPCs fail when dial fails due to an open breaker. Reset the breaker here
// as a stop-gap before the reconciliation occurs.
breaker.Success()
return ctx, roachpb.NewInternalClient(conn), nil
return conn, nil
}

// ConnHealth returns nil if we have an open connection to the given node
Expand Down
Loading