Skip to content

Commit

Permalink
Merge #121142
Browse files Browse the repository at this point in the history
121142: kvcoord: fix DistSender circuit breaker `tripped` metrics leak r=erikgrinaker a=erikgrinaker

If a tripped circuit breaker is GCed, the `tripped` metric will consider it tripped forever. This patch untrips the breaker during GC, taking care to properly shut down and synchronize with any concurrent probes to avoid metrics leaks.

Resolves #121030.
Epic: none
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Mar 27, 2024
2 parents 6f5ff8d + 91e7278 commit 6e8d2cd
Showing 1 changed file with 98 additions and 12 deletions.
110 changes: 98 additions & 12 deletions pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,37 @@ func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) {
cbs++

if idleDuration := cb.lastRequestDuration(nowNanos); idleDuration >= cbGCThreshold {
// Check if we raced with a concurrent delete. We don't expect to, since
// only this loop removes circuit breakers.
if _, ok := d.replicas.LoadAndDelete(key); ok {
// TODO(erikgrinaker): this needs to remove tripped circuit breakers
// from the metrics, otherwise they'll appear as tripped forever.
// However, there are race conditions with concurrent probes that can
// lead to metrics gauge leaks (both positive and negative), so we'll
// have to make sure we reap the probes here first.
// Check if we raced with a concurrent delete or replace. We don't
// expect to, since only this loop removes circuit breakers.
if v, ok := d.replicas.LoadAndDelete(key); ok {
cb = v.(*ReplicaCircuitBreaker)

d.metrics.CircuitBreaker.Replicas.Dec(1)
gced++

// We don't expect a probe to run, since the replica is idle, but we
// may race with a probe launch or there may be a long-running one (if
// e.g. the probe timeout or interval has increased).
//
// Close closedC to stop any running probes and prevent new probes
// from launching. Only we close it, due to the atomic map delete.
close(cb.closedC)

// The circuit breaker may be tripped, and reported as such in
// metrics. A concurrent probe may also be about to trip/untrip it.
// We let the probe's OnProbeDone() be responsible for managing the
// ReplicasTripped gauge to avoid metrics leaks, by untripping the
// breaker when closedC has been closed. To synchronize with a
// concurrent probe, we attempt to launch a new one. Either:
//
// a) no probe is running: we launch a noop probe which will
// immediately call OnProbeDone() and clean up. All future
// probes are noops.
//
// b) a concurrent probe is running: the Probe() call is a noop, but
// when the running probe shuts down in response to closedC,
// OnProbeDone() will clean up.
cb.breaker.Probe()
}
}
return true
Expand Down Expand Up @@ -380,6 +401,10 @@ type ReplicaCircuitBreaker struct {
// also necessary to check inflightReqs>0.
stallSince atomic.Int64

// closedC is closed when the circuit breaker has been GCed. This will shut
// down a running probe, and prevent new probes from launching.
closedC chan struct{}

mu struct {
syncutil.Mutex

Expand Down Expand Up @@ -407,6 +432,7 @@ func newReplicaCircuitBreaker(
rangeID: rangeDesc.RangeID,
startKey: rangeDesc.StartKey.AsRawKey(), // immutable
desc: *replDesc,
closedC: make(chan struct{}),
}
r.breaker = circuit.NewBreaker(circuit.Options{
Name: r.id(),
Expand Down Expand Up @@ -489,6 +515,19 @@ func (r *ReplicaCircuitBreaker) isTripped() bool {
return r.breaker.Signal().IsTripped()
}

// isClosed returns true if this circuit breaker has been closed and GCed.
func (r *ReplicaCircuitBreaker) isClosed() bool {
if r == nil {
return true // circuit breakers disabled
}
select {
case <-r.closedC:
return true
default:
return false
}
}

// Track attempts to start tracking a request with the circuit breaker. If the
// breaker is tripped, returns an error. Otherwise, returns the context to use
// for the send and a token which the caller must call Done() on with the result
Expand Down Expand Up @@ -672,6 +711,13 @@ func (r *ReplicaCircuitBreaker) done(
// handling such that if 1 out of 1000 replicas are stalled we won't fail the
// entire batch.
func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {
// If this circuit breaker has been closed, don't launch further probes. This
// acts as a synchronization point with circuit breaker GC.
if r.isClosed() {
done()
return
}

ctx := r.d.ambientCtx.AnnotateCtx(context.Background())

name := fmt.Sprintf("distsender-replica-probe-%s", r.id())
Expand Down Expand Up @@ -787,6 +833,12 @@ func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {
cancelRequests(cbWriteRequest)
writeGraceTimer.Read = true
writeGraceTimer.Stop() // sets C = nil
case <-r.closedC:
// The circuit breaker has been GCed, exit. We could cancel the context
// instead to also abort an in-flight probe, but that requires extra
// synchronization with circuit breaker GC (a probe may be launching but
// haven't yet installed its cancel function). This is simpler.
return
case <-r.d.stopper.ShouldQuiesce():
return
case <-ctx.Done():
Expand Down Expand Up @@ -921,6 +973,17 @@ func (r *ReplicaCircuitBreaker) OnTrip(b *circuit.Breaker, prev, cur error) {

// OnReset implements circuit.EventHandler.
func (r *ReplicaCircuitBreaker) OnReset(b *circuit.Breaker, prev error) {
// If the circuit breaker has been GCed, we don't need to log or record the
// probe success. We do need to decrement ReplicasTripped if we're actually
// tripped though, to avoid metrics leaks. This may be happen either in
// response to an actual probe success, or a noop probe during GC.
if r.isClosed() {
if prev != nil {
r.d.metrics.CircuitBreaker.ReplicasTripped.Dec(1)
}
return
}

// OnReset() is called every time the probe reports a success, regardless
// of whether the breaker was already tripped. Record each probe success,
// but only record untripped breakers when it was already tripped.
Expand All @@ -937,25 +1000,48 @@ func (r *ReplicaCircuitBreaker) OnReset(b *circuit.Breaker, prev error) {

// OnProbeLaunched implements circuit.EventHandler.
func (r *ReplicaCircuitBreaker) OnProbeLaunched(b *circuit.Breaker) {
r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Inc(1)

// If the circuit breaker has been GCed, don't log the probe launch since we
// don't actually spawn a goroutine. We still increment ProbesRunning above to
// avoid metrics leaks when decrementing in OnProbeDone().
if r.isClosed() {
return
}

ctx := r.d.ambientCtx.AnnotateCtx(context.Background())
nowNanos := timeutil.Now().UnixNano()
stallSince := r.stallDuration(nowNanos).Truncate(time.Millisecond)
errorSince := r.errorDuration(nowNanos).Truncate(time.Millisecond)
tripped := r.breaker.Signal().IsTripped()
log.VEventf(ctx, 2, "launching circuit breaker probe for %s (tripped=%t stall=%s error=%s)",
r.id(), tripped, stallSince, errorSince)

r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Inc(1)
}

// OnProbeDone implements circuit.EventHandler.
func (r *ReplicaCircuitBreaker) OnProbeDone(b *circuit.Breaker) {
r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Dec(1)

// If the circuit breaker has been GCed, don't log the probe stopping. We
// still decrement ProbesRunning above to avoid metrics leaks (we don't know
// if the circuit breaker was GCed when OnProbeLaunched was called).
//
// We must also reset the breaker if it's tripped, to avoid ReplicasTripped
// metric gauge leaks. This can either be in response to an already-running
// probe shutting down, or a noop probe launched by GC -- it doesn't matter.
// A concurrent request may then use the untripped breaker, but that's ok
// since it would also use an untripped breaker if it arrived after GC.
if r.isClosed() {
if r.isTripped() {
r.breaker.Reset()
}
return
}

ctx := r.d.ambientCtx.AnnotateCtx(context.Background())
nowNanos := timeutil.Now().UnixNano()
tripped := r.breaker.Signal().IsTripped()
lastRequest := r.lastRequestDuration(nowNanos).Truncate(time.Millisecond)
log.VEventf(ctx, 2, "stopping circuit breaker probe for %s (tripped=%t lastRequest=%s)",
r.id(), tripped, lastRequest)

r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Dec(1)
}

0 comments on commit 6e8d2cd

Please sign in to comment.