Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
121028: kvcoord: don't untrip DistSender circuit breaker on shutdown r=erikgrinaker a=erikgrinaker

Previously, if the node shuts down while a circuit breaker probe is in flight, this could cause the breaker to untrip. It shouldn't.

Epic: none
Release note: None

121029: kvcoord: use single threshold to GC DistSender circuit breakers r=erikgrinaker a=erikgrinaker

This patch uses a common idle timeout to GC both tripped and untripped circuit breakers, and increases the timeout to 20 minutes.

Epic: none
Release note: None

121031: backupccl: disable link external sst requests from seperate process non system tenants r=dt a=msbutler

Fixes #120525

Release note: none

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Mar 25, 2024
4 parents 899170e + 3bfb402 + 09fe156 + 20be3fd commit 2bd61ba
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 60 deletions.
57 changes: 27 additions & 30 deletions pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,8 @@ var (

const (
// cbGCThreshold is the threshold after which an idle replica's circuit
// breaker will be garbage collected.
cbGCThreshold = 10 * time.Minute

// cbGCThresholdTripped is the threshold after which an idle replica's circuit
// breaker will be garbage collected when tripped. This is greater than
// cbGCThreshold to avoid frequent (un)tripping of rarely accessed replicas.
cbGCThresholdTripped = time.Hour
// breaker will be garbage collected, even when tripped.
cbGCThreshold = 20 * time.Minute

// cbGCInterval is the interval between garbage collection scans.
cbGCInterval = time.Minute
Expand Down Expand Up @@ -268,18 +263,16 @@ func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) {
cbs++

if idleDuration := cb.lastRequestDuration(nowNanos); idleDuration >= cbGCThreshold {
if !cb.isTripped() || idleDuration >= cbGCThresholdTripped {
// Check if we raced with a concurrent delete. We don't expect to,
// since only this loop removes circuit breakers.
if _, ok := d.replicas.LoadAndDelete(key); ok {
// TODO(erikgrinaker): this needs to remove tripped circuit breakers
// from the metrics, otherwise they'll appear as tripped forever.
// However, there are race conditions with concurrent probes that
// can lead to metrics gauge leaks (both positive and negative), so
// we'll have to make sure we reap the probes here first.
d.metrics.CircuitBreaker.Replicas.Dec(1)
gced++
}
// Check if we raced with a concurrent delete. We don't expect to, since
// only this loop removes circuit breakers.
if _, ok := d.replicas.LoadAndDelete(key); ok {
// TODO(erikgrinaker): this needs to remove tripped circuit breakers
// from the metrics, otherwise they'll appear as tripped forever.
// However, there are race conditions with concurrent probes that can
// lead to metrics gauge leaks (both positive and negative), so we'll
// have to make sure we reap the probes here first.
d.metrics.CircuitBreaker.Replicas.Dec(1)
gced++
}
}
return true
Expand Down Expand Up @@ -667,6 +660,14 @@ func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {

// Probe the replica.
err := r.sendProbe(ctx, transport)

// If the context (with no timeout) failed, we're shutting down. Just exit
// the probe without reporting the result (which could trip the breaker).
if ctx.Err() != nil {
return
}

// Report the probe result.
report(err)
if err == nil {
// On a successful probe, record the success and stop probing.
Expand Down Expand Up @@ -739,9 +740,9 @@ func (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {
// these replicas.
func (r *ReplicaCircuitBreaker) sendProbe(ctx context.Context, transport Transport) error {
// We don't use timeutil.RunWithTimeout() because we need to be able to
// differentiate which context failed.
// differentiate whether the context timed out.
timeout := CircuitBreakerProbeTimeout.Get(&r.d.settings.SV)
sendCtx, cancel := context.WithTimeout(ctx, timeout) // nolint:context
ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()

transport.Reset()
Expand All @@ -756,19 +757,15 @@ func (r *ReplicaCircuitBreaker) sendProbe(ctx context.Context, transport Transpo
})

log.VEventf(ctx, 2, "sending probe to %s: %s", r.id(), ba)
br, err := transport.SendNext(sendCtx, ba)
br, err := transport.SendNext(ctx, ba)
log.VEventf(ctx, 2, "probe result from %s: br=%v err=%v", r.id(), br, err)

// Handle local send errors.
if err != nil {
// If the given context was cancelled, we're shutting down. Stop probing.
if ctx.Err() != nil {
return nil
}

// If the send context timed out, fail.
if ctxErr := sendCtx.Err(); ctxErr != nil {
return errors.Wrapf(ctxErr, "probe timed out")
// If the context timed out, fail. The caller will handle the case where
// we're shutting down.
if err := ctx.Err(); err != nil {
return errors.Wrapf(err, "probe timed out")
}

// Any other local error is likely a networking/gRPC issue. This includes if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,36 +187,34 @@ var (

var reqMethodToCap = map[kvpb.Method]tenantcapabilities.ID{
// The following requests are authorized for all workloads.
kvpb.AddSSTable: noCapCheckNeeded,
// TODO(dt): only allow system tenant to send LinkExternalSSTable requests.
kvpb.LinkExternalSSTable: noCapCheckNeeded,
kvpb.Barrier: noCapCheckNeeded,
kvpb.ClearRange: noCapCheckNeeded,
kvpb.ConditionalPut: noCapCheckNeeded,
kvpb.Delete: noCapCheckNeeded,
kvpb.DeleteRange: noCapCheckNeeded,
kvpb.EndTxn: noCapCheckNeeded,
kvpb.Export: noCapCheckNeeded,
kvpb.Get: noCapCheckNeeded,
kvpb.HeartbeatTxn: noCapCheckNeeded,
kvpb.Increment: noCapCheckNeeded,
kvpb.InitPut: noCapCheckNeeded,
kvpb.IsSpanEmpty: noCapCheckNeeded,
kvpb.LeaseInfo: noCapCheckNeeded,
kvpb.PushTxn: noCapCheckNeeded,
kvpb.Put: noCapCheckNeeded,
kvpb.QueryIntent: noCapCheckNeeded,
kvpb.QueryLocks: noCapCheckNeeded,
kvpb.QueryTxn: noCapCheckNeeded,
kvpb.RangeStats: noCapCheckNeeded,
kvpb.RecoverTxn: noCapCheckNeeded,
kvpb.Refresh: noCapCheckNeeded,
kvpb.RefreshRange: noCapCheckNeeded,
kvpb.ResolveIntent: noCapCheckNeeded,
kvpb.ResolveIntentRange: noCapCheckNeeded,
kvpb.ReverseScan: noCapCheckNeeded,
kvpb.RevertRange: noCapCheckNeeded,
kvpb.Scan: noCapCheckNeeded,
kvpb.AddSSTable: noCapCheckNeeded,
kvpb.Barrier: noCapCheckNeeded,
kvpb.ClearRange: noCapCheckNeeded,
kvpb.ConditionalPut: noCapCheckNeeded,
kvpb.Delete: noCapCheckNeeded,
kvpb.DeleteRange: noCapCheckNeeded,
kvpb.EndTxn: noCapCheckNeeded,
kvpb.Export: noCapCheckNeeded,
kvpb.Get: noCapCheckNeeded,
kvpb.HeartbeatTxn: noCapCheckNeeded,
kvpb.Increment: noCapCheckNeeded,
kvpb.InitPut: noCapCheckNeeded,
kvpb.IsSpanEmpty: noCapCheckNeeded,
kvpb.LeaseInfo: noCapCheckNeeded,
kvpb.PushTxn: noCapCheckNeeded,
kvpb.Put: noCapCheckNeeded,
kvpb.QueryIntent: noCapCheckNeeded,
kvpb.QueryLocks: noCapCheckNeeded,
kvpb.QueryTxn: noCapCheckNeeded,
kvpb.RangeStats: noCapCheckNeeded,
kvpb.RecoverTxn: noCapCheckNeeded,
kvpb.Refresh: noCapCheckNeeded,
kvpb.RefreshRange: noCapCheckNeeded,
kvpb.ResolveIntent: noCapCheckNeeded,
kvpb.ResolveIntentRange: noCapCheckNeeded,
kvpb.ReverseScan: noCapCheckNeeded,
kvpb.RevertRange: noCapCheckNeeded,
kvpb.Scan: noCapCheckNeeded,

// The following are authorized via specific capabilities.
kvpb.AdminChangeReplicas: tenantcapabilities.CanAdminRelocateRange,
Expand All @@ -243,6 +241,7 @@ var reqMethodToCap = map[kvpb.Method]tenantcapabilities.ID{
kvpb.TransferLease: onlySystemTenant,
kvpb.TruncateLog: onlySystemTenant,
kvpb.WriteBatch: onlySystemTenant,
kvpb.LinkExternalSSTable: onlySystemTenant,
}

const (
Expand Down

0 comments on commit 2bd61ba

Please sign in to comment.