Skip to content

Commit

Permalink
kvserver: add metric for rangefeed cancellations due to lag
Browse files Browse the repository at this point in the history
Add a new counter metric,
`kv.rangefeed.closed_timestamp.slow_ranges.cancelled`, which is
incremented each time a rangefeed is cancelled server-side due to a
chronically lagging closed timestamp (see cockroachdb#137531).

Part of: cockroachdb#136214
Release note: None
  • Loading branch information
kvoli committed Dec 18, 2024
1 parent 8cfc75e commit 930ba25
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 19 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@
<tr><td>STORAGE</td><td>kv.rangefeed.budget_allocation_failed</td><td>Number of times RangeFeed failed because memory budget was exceeded</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.catchup_scan_nanos</td><td>Time spent in RangeFeed catchup scan</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.closed_timestamp.slow_ranges</td><td>Number of ranges that have a closed timestamp lagging by more than 5x target lag. Periodically re-calculated</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.closed_timestamp.slow_ranges.cancelled</td><td>Number of rangefeeds that were cancelled due to a chronically lagging closed timestamp</td><td>Cancellation Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.closed_timestamp_max_behind_nanos</td><td>Largest latency between realtime and replica max closed timestamp for replicas that have active rangeeds on them</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.mem_shared</td><td>Memory usage by rangefeeds</td><td>Memory</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.mem_system</td><td>Memory usage by rangefeeds on system ranges</td><td>Memory</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6277,6 +6277,7 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) {
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
n3 := sqlutils.MakeSQLRunner(tc.ServerConn(2))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
Expand Down Expand Up @@ -6311,6 +6312,18 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) {
})
}

// We will use this metric to observe the server-side rangefeed cancellation,
// which should be zero prior to the send queue developing. Then, non-zero
// shortly after.
const rangeFeedCancelMetricQueryStr = `
SELECT
name,
value
FROM crdb_internal.node_metrics
WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled'
ORDER BY name ASC;
`

closeFeed := rangeFeed(
ctx,
ts.DistSenderI(),
Expand Down Expand Up @@ -6341,6 +6354,11 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) {
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`
-- Observe the server-side rangefeed cancellation metric on n3, before a send
-- queue develops, it should be zero:`)
h.query(n3, rangeFeedCancelMetricQueryStr)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
Expand Down Expand Up @@ -6370,6 +6388,10 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) {
newNode := curRangeFeedNodeID.Load().(roachpb.NodeID)
h.comment(fmt.Sprintf(`(Rangefeed moved to n%v)`, newNode))

h.comment(`
-- Observe the server-side rangefeed cancellation metric increased on n3:`)
h.query(n3, rangeFeedCancelMetricQueryStr)

h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
Expand Down
47 changes: 28 additions & 19 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ var (
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaRangeFeedSlowClosedTimestampCancelledRanges = metric.Metadata{
Name: "kv.rangefeed.closed_timestamp.slow_ranges.cancelled",
Help: "Number of rangefeeds that were cancelled due to a chronically " +
"lagging closed timestamp",
Measurement: "Cancellation Count",
Unit: metric.Unit_COUNT,
}
metaRangeFeedProcessorsGO = metric.Metadata{
Name: "kv.rangefeed.processors_goroutine",
Help: "Number of active RangeFeed processors using goroutines",
Expand Down Expand Up @@ -86,14 +93,15 @@ var (

// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
RangeFeedCatchUpScanNanos *metric.Counter
RangeFeedBudgetExhausted *metric.Counter
RangefeedProcessorQueueTimeout *metric.Counter
RangeFeedBudgetBlocked *metric.Counter
RangeFeedRegistrations *metric.Gauge
RangeFeedClosedTimestampMaxBehindNanos *metric.Gauge
RangeFeedSlowClosedTimestampRanges *metric.Gauge
RangeFeedSlowClosedTimestampLogN log.EveryN
RangeFeedCatchUpScanNanos *metric.Counter
RangeFeedBudgetExhausted *metric.Counter
RangefeedProcessorQueueTimeout *metric.Counter
RangeFeedBudgetBlocked *metric.Counter
RangeFeedSlowClosedTimestampCancelledRanges *metric.Counter
RangeFeedRegistrations *metric.Gauge
RangeFeedClosedTimestampMaxBehindNanos *metric.Gauge
RangeFeedSlowClosedTimestampRanges *metric.Gauge
RangeFeedSlowClosedTimestampLogN log.EveryN
// RangeFeedSlowClosedTimestampNudgeSem bounds the amount of work that can be
// spun up on behalf of the RangeFeed nudger. We don't expect to hit this
// limit, but it's here to limit the effect on stability in case something
Expand All @@ -112,17 +120,18 @@ func (*Metrics) MetricStruct() {}
// NewMetrics makes the metrics for RangeFeeds monitoring.
func NewMetrics() *Metrics {
return &Metrics{
RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
RangefeedProcessorQueueTimeout: metric.NewCounter(metaQueueTimeout),
RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted),
RangeFeedBudgetBlocked: metric.NewCounter(metaRangeFeedBudgetBlocked),
RangeFeedRegistrations: metric.NewGauge(metaRangeFeedRegistrations),
RangeFeedClosedTimestampMaxBehindNanos: metric.NewGauge(metaRangeFeedClosedTimestampMaxBehindNanos),
RangeFeedSlowClosedTimestampRanges: metric.NewGauge(metaRangefeedSlowClosedTimestampRanges),
RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second),
RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
RangeFeedProcessorsGO: metric.NewGauge(metaRangeFeedProcessorsGO),
RangeFeedProcessorsScheduler: metric.NewGauge(metaRangeFeedProcessorsScheduler),
RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
RangefeedProcessorQueueTimeout: metric.NewCounter(metaQueueTimeout),
RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted),
RangeFeedBudgetBlocked: metric.NewCounter(metaRangeFeedBudgetBlocked),
RangeFeedSlowClosedTimestampCancelledRanges: metric.NewCounter(metaRangeFeedSlowClosedTimestampCancelledRanges),
RangeFeedRegistrations: metric.NewGauge(metaRangeFeedRegistrations),
RangeFeedClosedTimestampMaxBehindNanos: metric.NewGauge(metaRangeFeedClosedTimestampMaxBehindNanos),
RangeFeedSlowClosedTimestampRanges: metric.NewGauge(metaRangefeedSlowClosedTimestampRanges),
RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second),
RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
RangeFeedProcessorsGO: metric.NewGauge(metaRangeFeedProcessorsGO),
RangeFeedProcessorsScheduler: metric.NewGauge(metaRangeFeedProcessorsScheduler),
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(
log.Warningf(ctx,
`RangeFeed is too far behind, cancelling for replanning [%v]`, signal)
r.disconnectRangefeedWithReason(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)
r.store.metrics.RangeFeedMetrics.RangeFeedSlowClosedTimestampCancelledRanges.Inc(1)
}
return nil, nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ echo
-- allows the rangefeed request to be re-routed to another replica.


-- Observe the server-side rangefeed cancellation metric on n3, before a send
-- queue develops, it should be zero:
SELECT
name,
value
FROM crdb_internal.node_metrics
WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled'
ORDER BY name ASC;

kv.rangefeed.closed_timestamp.slow_ranges.cancelled | 0


(Sending 1 MiB put request to develop a send queue)


Expand Down Expand Up @@ -66,6 +78,17 @@ SELECT store_id,
(Rangefeed moved to n1)


-- Observe the server-side rangefeed cancellation metric increased on n3:
SELECT
name,
value
FROM crdb_internal.node_metrics
WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled'
ORDER BY name ASC;

kv.rangefeed.closed_timestamp.slow_ranges.cancelled | 1


-- (Allowing below-raft admission to proceed on n3.)


Expand Down

0 comments on commit 930ba25

Please sign in to comment.