Skip to content

Commit

Permalink
kvserver: add metric for replica circuit breaker
Browse files Browse the repository at this point in the history
Add a metric that tracks how many replicas have tripped circuit
breakers.
Add a metric that counts the trip events as well. This can highlight
problems where the circuit is tripped in error and immediately untrips
(which may not be caught by the first metric).

Since tripped circuit breakers highlight an availability problem,
we're also adding an alert/aggregation rule.

Also, as requested by @mwang1026, report the count of trip events via
telemetry as well.

Fixes cockroachdb#74505.

Release note: None
  • Loading branch information
tbg authored and Gerardo Torres committed Jan 24, 2022
1 parent e71db80 commit 7e74f99
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 9 deletions.
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func TestReplicaCircuitBreaker_NotTripped(t *testing.T) {
// In this test, n1 holds the lease and we disable the probe and trip the
// breaker. While the breaker is tripped, requests fail-fast with either a
// breaker or lease error. When the probe is re-enabled, everything heals.
//
// This test also verifies the circuit breaker metrics. This is only done
// in this one test since little would be gained by adding it across the
// board.
func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -80,6 +84,14 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) {
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("boom"))

s1 := tc.GetFirstStoreFromServer(t, n1)
s2 := tc.GetFirstStoreFromServer(t, n2)
// Both current and all-time trip events increment on n1, not on n2.
require.EqualValues(t, 1, s1.Metrics().ReplicaCircuitBreakerCurTripped.Value())
require.EqualValues(t, 1, s1.Metrics().ReplicaCircuitBreakerCumTripped.Count())
require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCurTripped.Value())
require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCumTripped.Count())

// n1 could theoretically still serve reads (there is a valid lease
// and none of the latches are taken), but since it is hard to determine
// that upfront we currently fail all reads as well.
Expand All @@ -103,6 +115,14 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) {
// Same behavior on writes.
tc.Report(n1, errors.New("boom again"))
tc.UntripsSoon(t, tc.Write, n1)

// Currently tripped drops back to zero, all-time is two (since we tripped
// it twice)
require.EqualValues(t, 0, s1.Metrics().ReplicaCircuitBreakerCurTripped.Value())
require.EqualValues(t, 2, s1.Metrics().ReplicaCircuitBreakerCumTripped.Count())
// s2 wasn't affected by any breaker events.
require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCurTripped.Value())
require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCumTripped.Count())
}

// In this scenario we have n1 holding the lease and we permanently trip the
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/metric_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

const (
unavailableRangesRuleName = "UnavailableRanges"
trippedReplicaCircuitBreakersRuleName = "TrippedReplicaCircuitBreakers"
underreplicatedRangesRuleName = "UnderreplicatedRanges"
requestsStuckInRaftRuleName = "RequestsStuckInRaft"
highOpenFDCountRuleName = "HighOpenFDCount"
Expand All @@ -38,6 +39,7 @@ const (
// YAML format.
func CreateAndAddRules(ctx context.Context, ruleRegistry *metric.RuleRegistry) {
createAndRegisterUnavailableRangesRule(ctx, ruleRegistry)
createAndRegisterTrippedReplicaCircuitBreakersRule(ctx, ruleRegistry)
createAndRegisterUnderReplicatedRangesRule(ctx, ruleRegistry)
createAndRegisterRequestsStuckInRaftRule(ctx, ruleRegistry)
createAndRegisterHighOpenFDCountRule(ctx, ruleRegistry)
Expand All @@ -62,6 +64,30 @@ func createAndRegisterUnavailableRangesRule(
recommendedHoldDuration := 10 * time.Minute
help := "This check detects when the number of ranges with less than quorum replicas live are non-zero for too long"

rule, err := metric.NewAlertingRule(
trippedReplicaCircuitBreakersRuleName,
expr,
annotations,
nil,
recommendedHoldDuration,
help,
true,
)
maybeAddRuleToRegistry(ctx, err, unavailableRangesRuleName, rule, ruleRegistry)
}

func createAndRegisterTrippedReplicaCircuitBreakersRule(
ctx context.Context, ruleRegistry *metric.RuleRegistry,
) {
expr := "(sum by(instance, cluster) (kv_replica_circuit_breaker_num_tripped_replicas)) > 0"
var annotations []metric.LabelPair
annotations = append(annotations, metric.LabelPair{
Name: proto.String("summary"),
Value: proto.String("Instance {{ $labels.instance }} has {{ $value }} tripped per-Replica circuit breakers"),
})
recommendedHoldDuration := 10 * time.Minute
help := "This check detects when Replicas have stopped serving traffic as a result of KV health issues"

unavailableRanges, err := metric.NewAlertingRule(
unavailableRangesRuleName,
expr,
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/metric_rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestMetricRules(t *testing.T) {
ruleRegistry := metric.NewRuleRegistry()
CreateAndAddRules(context.Background(), ruleRegistry)
require.NotNil(t, ruleRegistry.GetRuleForTest(unavailableRangesRuleName))
require.NotNil(t, ruleRegistry.GetRuleForTest(trippedReplicaCircuitBreakersRuleName))
require.NotNil(t, ruleRegistry.GetRuleForTest(underreplicatedRangesRuleName))
require.NotNil(t, ruleRegistry.GetRuleForTest(requestsStuckInRaftRuleName))
require.NotNil(t, ruleRegistry.GetRuleForTest(highOpenFDCountRuleName))
Expand All @@ -39,5 +40,5 @@ func TestMetricRules(t *testing.T) {
require.NotNil(t, ruleRegistry.GetRuleForTest(capacityAvailableRatioRuleName))
require.NotNil(t, ruleRegistry.GetRuleForTest(nodeCapacityAvailableRatioRuleName))
require.NotNil(t, ruleRegistry.GetRuleForTest(clusterCapacityAvailableRatioRuleName))
require.Equal(t, 11, ruleRegistry.GetRuleCountForTest())
require.Equal(t, 12, ruleRegistry.GetRuleCountForTest())
}
28 changes: 28 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,26 @@ throttled they do count towards 'delay.total' and 'delay.enginebackpressure'.
Measurement: "Attempts",
Unit: metric.Unit_COUNT,
}

// Replica circuit breaker.
metaReplicaCircuitBreakerCurTripped = metric.Metadata{
Name: "kv.replica_circuit_breaker.num_tripped_replicas",
Help: `Number of Replicas for which the per-Replica circuit breaker is currently tripped.
A nonzero value indicates range or replica unavailability, and should be investigated.
Replicas in this state will fail-fast all inbound requests.
`,
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}

// Replica circuit breaker.
metaReplicaCircuitBreakerCumTripped = metric.Metadata{
Name: "kv.replica_circuit_breaker.num_tripped_events",
Help: `Number of times the per-Replica circuit breakers tripped since process start.`,
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
)

// StoreMetrics is the set of metrics for a given store.
Expand Down Expand Up @@ -1470,6 +1490,10 @@ type StoreMetrics struct {

// Closed timestamp metrics.
ClosedTimestampMaxBehindNanos *metric.Gauge

// Replica circuit breaker.
ReplicaCircuitBreakerCurTripped *metric.Gauge
ReplicaCircuitBreakerCumTripped *metric.Counter
}

type tenantMetricsRef struct {
Expand Down Expand Up @@ -1908,6 +1932,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {

// Closed timestamp metrics.
ClosedTimestampMaxBehindNanos: metric.NewGauge(metaClosedTimestampMaxBehindNanos),

// Replica circuit breaker.
ReplicaCircuitBreakerCurTripped: metric.NewGauge(metaReplicaCircuitBreakerCurTripped),
ReplicaCircuitBreakerCumTripped: metric.NewCounter(metaReplicaCircuitBreakerCumTripped),
}
storeRegistry.AddMetricStruct(sm)

Expand Down
34 changes: 31 additions & 3 deletions pkg/kv/kvserver/replica_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
Expand Down Expand Up @@ -50,6 +51,9 @@ var replicaCircuitBreakerSlowReplicationThreshold = settings.RegisterPublicDurat
settings.NonNegativeDuration,
)

// Telemetry counter to count number of trip events.
var telemetryTripAsync = telemetry.GetCounterOnce("kv.replica_circuit_breaker.num_tripped_events")

// replicaCircuitBreaker is a wrapper around *circuit.Breaker that makes it
// convenient for use as a per-Replica circuit breaker.
type replicaCircuitBreaker struct {
Expand Down Expand Up @@ -104,6 +108,8 @@ func newReplicaCircuitBreaker(
stopper *stop.Stopper,
ambientCtx log.AmbientContext,
r replicaInCircuitBreaker,
onTrip func(),
onReset func(),
) *replicaCircuitBreaker {
br := &replicaCircuitBreaker{
stopper: stopper,
Expand All @@ -115,16 +121,38 @@ func newReplicaCircuitBreaker(
br.wrapped = circuit.NewBreaker(circuit.Options{
Name: "breaker", // log bridge has ctx tags
AsyncProbe: br.asyncProbe,
EventHandler: &circuit.EventLogger{
Log: func(buf redact.StringBuilder) {
log.Infof(ambientCtx.AnnotateCtx(context.Background()), "%s", buf)
EventHandler: &replicaCircuitBreakerLogger{
EventHandler: &circuit.EventLogger{
Log: func(buf redact.StringBuilder) {
log.Infof(ambientCtx.AnnotateCtx(context.Background()), "%s", buf)
},
},
onTrip: onTrip,
onReset: onReset,
},
})

return br
}

type replicaCircuitBreakerLogger struct {
circuit.EventHandler
onTrip func()
onReset func()
}

func (r replicaCircuitBreakerLogger) OnTrip(br *circuit.Breaker, prev, cur error) {
if prev == nil {
r.onTrip()
}
r.EventHandler.OnTrip(br, prev, cur)
}

func (r replicaCircuitBreakerLogger) OnReset(br *circuit.Breaker) {
r.onReset()
r.EventHandler.OnReset(br)
}

type probeKey struct{}

func isCircuitBreakerProbe(ctx context.Context) bool {
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -128,7 +129,17 @@ func newUnloadedReplica(
r.splitQueueThrottle = util.Every(splitQueueThrottleDuration)
r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration)

r.breaker = newReplicaCircuitBreaker(store.cfg.Settings, store.stopper, r.AmbientContext, r)
onTrip := func() {
telemetry.Inc(telemetryTripAsync)
r.store.Metrics().ReplicaCircuitBreakerCumTripped.Inc(1)
store.Metrics().ReplicaCircuitBreakerCurTripped.Inc(1)
}
onReset := func() {
store.Metrics().ReplicaCircuitBreakerCurTripped.Dec(1)
}
r.breaker = newReplicaCircuitBreaker(
store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset,
)
return r
}

Expand Down
15 changes: 11 additions & 4 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,27 +881,34 @@ var charts = []sectionDescription{
{
Title: "Latch",
Downsampler: DescribeAggregator_MAX,
Percentiles: false,
Metrics: []string{"requests.slow.latch"},
},
{
Title: "Stuck Acquiring Lease",
Downsampler: DescribeAggregator_MAX,
Percentiles: false,
Metrics: []string{"requests.slow.lease"},
},
{
Title: "Stuck in Raft",
Downsampler: DescribeAggregator_MAX,
Percentiles: false,
Metrics: []string{"requests.slow.raft"},
},
{
Title: "Stuck sending RPCs to range",
Downsampler: DescribeAggregator_MAX,
Percentiles: false,
Metrics: []string{"requests.slow.distsender"},
},
{
Title: "Replicas with tripped circuit breakers",
Downsampler: DescribeAggregator_MAX,
Metrics: []string{"kv.replica_circuit_breaker.num_tripped_replicas"},
},
{
Title: "Replica circuit breaker trip events",
Downsampler: DescribeAggregator_MAX,
Rate: DescribeDerivative_NON_NEGATIVE_DERIVATIVE,
Metrics: []string{"kv.replica_circuit_breaker.num_tripped_events"},
},
},
},
{
Expand Down

0 comments on commit 7e74f99

Please sign in to comment.