diff --git a/pkg/rpc/peer.go b/pkg/rpc/peer.go index 3f85fcf1b0b6..5533bead8d6e 100644 --- a/pkg/rpc/peer.go +++ b/pkg/rpc/peer.go @@ -240,19 +240,15 @@ func (p *peer) launch(ctx context.Context, report func(error), done func()) { if err := p.opts.Stopper.RunAsyncTask(ctx, taskName, func(ctx context.Context) { p.run(ctx, report, done) }); err != nil { - // Stopper draining. Since we're trying to launch a probe, we know the - // breaker is tripped. We overwrite the error since we want errQuiescing - // (which has a gRPC status), not kvpb.NodeUnavailableError. - err = errQuiescing - report(err) - // We also need to resolve connFuture because a caller may be waiting on - // (*Connection).ConnectNoBreaker, and they need to be signaled as well - // but aren't listening to the stopper. - p.mu.c.connFuture.Resolve(nil, errQuiescing) + p.onQuiesce(report) done() } } +// run synchronously runs the probe. +// +// INVARIANT: p.mu.c is a "fresh" connection (i.e. unresolved connFuture) +// whenever `run` is invoked. func (p *peer) run(ctx context.Context, report func(error), done func()) { var t timeutil.Timer defer t.Stop() @@ -266,14 +262,9 @@ func (p *peer) run(ctx context.Context, report func(error), done func()) { return } - // NB: we don't need to close initialHeartbeatDone in these error cases. - // Connect() is cancellation-sensitive as well. select { case <-ctx.Done(): - // Stopper quiescing, node shutting down. Mirroring what breakerProbe.launch - // does when it can't launch an async task: leave the broken connection around, - // no need to close initialHeartbeatDone, just report errQuiescing and quit. - report(errQuiescing) + p.onQuiesce(report) return case <-t.C: t.Read = true @@ -700,6 +691,17 @@ func (p *peer) onHeartbeatFailed( p.ConnectionFailures.Inc(1) } +// onQuiesce is called when the probe exits or refuses to start due to +// quiescing. +func (p *peer) onQuiesce(report func(error)) { + // Stopper quiescing, node shutting down. + report(errQuiescing) + // NB: it's important that connFuture is resolved, or a caller sitting on + // `c.ConnectNoBreaker` would never be unblocked; after all, the probe won't + // start again in the future. + p.snap().c.connFuture.Resolve(nil, errQuiescing) +} + func (p PeerSnap) deletable(now time.Time) bool { if p.deleteAfter == 0 { return false