From 91e72787579190e4b4c4bcbc95d1017ccb8c8808 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 26 Mar 2024 19:39:45 +0000 Subject: [PATCH] kvcoord: fix DistSender circuit breaker `tripped` metrics leak If a tripped circuit breaker is GCed, the `tripped` metric will consider it tripped forever. This patch untrips the breaker during GC, taking care to properly shut down and synchronize with any concurrent probes to avoid metrics leaks. Epic: none Release note: None --- .../kvcoord/dist_sender_circuit_breaker.go | 110 ++++++++++++++++-- 1 file changed, 98 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go b/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go index c1eed3e592a7..3d725c7bfd7d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go @@ -308,16 +308,37 @@ func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) { cbs++ if idleDuration := cb.lastRequestDuration(nowNanos); idleDuration >= cbGCThreshold { - // Check if we raced with a concurrent delete. We don't expect to, since - // only this loop removes circuit breakers. - if _, ok := d.replicas.LoadAndDelete(key); ok { - // TODO(erikgrinaker): this needs to remove tripped circuit breakers - // from the metrics, otherwise they'll appear as tripped forever. - // However, there are race conditions with concurrent probes that can - // lead to metrics gauge leaks (both positive and negative), so we'll - // have to make sure we reap the probes here first. + // Check if we raced with a concurrent delete or replace. We don't + // expect to, since only this loop removes circuit breakers. + if v, ok := d.replicas.LoadAndDelete(key); ok { + cb = v.(*ReplicaCircuitBreaker) + d.metrics.CircuitBreaker.Replicas.Dec(1) gced++ + + // We don't expect a probe to run, since the replica is idle, but we + // may race with a probe launch or there may be a long-running one (if + // e.g. the probe timeout or interval has increased). + // + // Close closedC to stop any running probes and prevent new probes + // from launching. Only we close it, due to the atomic map delete. + close(cb.closedC) + + // The circuit breaker may be tripped, and reported as such in + // metrics. A concurrent probe may also be about to trip/untrip it. + // We let the probe's OnProbeDone() be responsible for managing the + // ReplicasTripped gauge to avoid metrics leaks, by untripping the + // breaker when closedC has been closed. To synchronize with a + // concurrent probe, we attempt to launch a new one. Either: + // + // a) no probe is running: we launch a noop probe which will + // immediately call OnProbeDone() and clean up. All future + // probes are noops. + // + // b) a concurrent probe is running: the Probe() call is a noop, but + // when the running probe shuts down in response to closedC, + // OnProbeDone() will clean up. + cb.breaker.Probe() } } return true @@ -380,6 +401,10 @@ type ReplicaCircuitBreaker struct { // also necessary to check inflightReqs>0. stallSince atomic.Int64 + // closedC is closed when the circuit breaker has been GCed. This will shut + // down a running probe, and prevent new probes from launching. + closedC chan struct{} + mu struct { syncutil.Mutex @@ -407,6 +432,7 @@ func newReplicaCircuitBreaker( rangeID: rangeDesc.RangeID, startKey: rangeDesc.StartKey.AsRawKey(), // immutable desc: *replDesc, + closedC: make(chan struct{}), } r.breaker = circuit.NewBreaker(circuit.Options{ Name: r.id(), @@ -489,6 +515,19 @@ func (r *ReplicaCircuitBreaker) isTripped() bool { return r.breaker.Signal().IsTripped() } +// isClosed returns true if this circuit breaker has been closed and GCed. +func (r *ReplicaCircuitBreaker) isClosed() bool { + if r == nil { + return true // circuit breakers disabled + } + select { + case <-r.closedC: + return true + default: + return false + } +} + // Track attempts to start tracking a request with the circuit breaker. If the // breaker is tripped, returns an error. Otherwise, returns the context to use // for the send and a token which the caller must call Done() on with the result @@ -672,6 +711,13 @@ func (r *ReplicaCircuitBreaker) done( // handling such that if 1 out of 1000 replicas are stalled we won't fail the // entire batch. func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) { + // If this circuit breaker has been closed, don't launch further probes. This + // acts as a synchronization point with circuit breaker GC. + if r.isClosed() { + done() + return + } + ctx := r.d.ambientCtx.AnnotateCtx(context.Background()) name := fmt.Sprintf("distsender-replica-probe-%s", r.id()) @@ -787,6 +833,12 @@ func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) { cancelRequests(cbWriteRequest) writeGraceTimer.Read = true writeGraceTimer.Stop() // sets C = nil + case <-r.closedC: + // The circuit breaker has been GCed, exit. We could cancel the context + // instead to also abort an in-flight probe, but that requires extra + // synchronization with circuit breaker GC (a probe may be launching but + // haven't yet installed its cancel function). This is simpler. + return case <-r.d.stopper.ShouldQuiesce(): return case <-ctx.Done(): @@ -921,6 +973,17 @@ func (r *ReplicaCircuitBreaker) OnTrip(b *circuit.Breaker, prev, cur error) { // OnReset implements circuit.EventHandler. func (r *ReplicaCircuitBreaker) OnReset(b *circuit.Breaker, prev error) { + // If the circuit breaker has been GCed, we don't need to log or record the + // probe success. We do need to decrement ReplicasTripped if we're actually + // tripped though, to avoid metrics leaks. This may be happen either in + // response to an actual probe success, or a noop probe during GC. + if r.isClosed() { + if prev != nil { + r.d.metrics.CircuitBreaker.ReplicasTripped.Dec(1) + } + return + } + // OnReset() is called every time the probe reports a success, regardless // of whether the breaker was already tripped. Record each probe success, // but only record untripped breakers when it was already tripped. @@ -937,6 +1000,15 @@ func (r *ReplicaCircuitBreaker) OnReset(b *circuit.Breaker, prev error) { // OnProbeLaunched implements circuit.EventHandler. func (r *ReplicaCircuitBreaker) OnProbeLaunched(b *circuit.Breaker) { + r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Inc(1) + + // If the circuit breaker has been GCed, don't log the probe launch since we + // don't actually spawn a goroutine. We still increment ProbesRunning above to + // avoid metrics leaks when decrementing in OnProbeDone(). + if r.isClosed() { + return + } + ctx := r.d.ambientCtx.AnnotateCtx(context.Background()) nowNanos := timeutil.Now().UnixNano() stallSince := r.stallDuration(nowNanos).Truncate(time.Millisecond) @@ -944,18 +1016,32 @@ func (r *ReplicaCircuitBreaker) OnProbeLaunched(b *circuit.Breaker) { tripped := r.breaker.Signal().IsTripped() log.VEventf(ctx, 2, "launching circuit breaker probe for %s (tripped=%t stall=%s error=%s)", r.id(), tripped, stallSince, errorSince) - - r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Inc(1) } // OnProbeDone implements circuit.EventHandler. func (r *ReplicaCircuitBreaker) OnProbeDone(b *circuit.Breaker) { + r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Dec(1) + + // If the circuit breaker has been GCed, don't log the probe stopping. We + // still decrement ProbesRunning above to avoid metrics leaks (we don't know + // if the circuit breaker was GCed when OnProbeLaunched was called). + // + // We must also reset the breaker if it's tripped, to avoid ReplicasTripped + // metric gauge leaks. This can either be in response to an already-running + // probe shutting down, or a noop probe launched by GC -- it doesn't matter. + // A concurrent request may then use the untripped breaker, but that's ok + // since it would also use an untripped breaker if it arrived after GC. + if r.isClosed() { + if r.isTripped() { + r.breaker.Reset() + } + return + } + ctx := r.d.ambientCtx.AnnotateCtx(context.Background()) nowNanos := timeutil.Now().UnixNano() tripped := r.breaker.Signal().IsTripped() lastRequest := r.lastRequestDuration(nowNanos).Truncate(time.Millisecond) log.VEventf(ctx, 2, "stopping circuit breaker probe for %s (tripped=%t lastRequest=%s)", r.id(), tripped, lastRequest) - - r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Dec(1) }