diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 46e068ec1cdc..2f1cf8e0fbb7 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -244,6 +244,7 @@
STORAGE | kv.rangefeed.budget_allocation_failed | Number of times RangeFeed failed because memory budget was exceeded | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | kv.rangefeed.catchup_scan_nanos | Time spent in RangeFeed catchup scan | Nanoseconds | COUNTER | NANOSECONDS | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | kv.rangefeed.closed_timestamp.slow_ranges | Number of ranges that have a closed timestamp lagging by more than 5x target lag. Periodically re-calculated | Ranges | GAUGE | COUNT | AVG | NONE |
+STORAGE | kv.rangefeed.closed_timestamp.slow_ranges.cancelled | Number of rangefeeds that were cancelled due to a chronically lagging closed timestamp | Cancellation Count | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | kv.rangefeed.closed_timestamp_max_behind_nanos | Largest latency between realtime and replica max closed timestamp for replicas that have active rangeeds on them | Nanoseconds | GAUGE | NANOSECONDS | AVG | NONE |
STORAGE | kv.rangefeed.mem_shared | Memory usage by rangefeeds | Memory | GAUGE | BYTES | AVG | NONE |
STORAGE | kv.rangefeed.mem_system | Memory usage by rangefeeds on system ranges | Memory | GAUGE | BYTES | AVG | NONE |
diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go
index 67e45a400840..966ff7ee819f 100644
--- a/pkg/kv/kvserver/flow_control_integration_test.go
+++ b/pkg/kv/kvserver/flow_control_integration_test.go
@@ -6277,6 +6277,7 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) {
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ n3 := sqlutils.MakeSQLRunner(tc.ServerConn(2))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
@@ -6311,6 +6312,18 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) {
})
}
+ // We will use this metric to observe the server-side rangefeed cancellation,
+ // which should be zero prior to the send queue developing. Then, non-zero
+ // shortly after.
+ const rangeFeedCancelMetricQueryStr = `
+ SELECT
+ name,
+ value
+ FROM crdb_internal.node_metrics
+ WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled'
+ ORDER BY name ASC;
+`
+
closeFeed := rangeFeed(
ctx,
ts.DistSenderI(),
@@ -6341,6 +6354,11 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) {
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
+ h.comment(`
+-- Observe the server-side rangefeed cancellation metric on n3, before a send
+-- queue develops, it should be zero:`)
+ h.query(n3, rangeFeedCancelMetricQueryStr)
+
h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
@@ -6370,6 +6388,10 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) {
newNode := curRangeFeedNodeID.Load().(roachpb.NodeID)
h.comment(fmt.Sprintf(`(Rangefeed moved to n%v)`, newNode))
+ h.comment(`
+-- Observe the server-side rangefeed cancellation metric increased on n3:`)
+ h.query(n3, rangeFeedCancelMetricQueryStr)
+
h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
diff --git a/pkg/kv/kvserver/rangefeed/metrics.go b/pkg/kv/kvserver/rangefeed/metrics.go
index a8d7b2912f87..353e487aa746 100644
--- a/pkg/kv/kvserver/rangefeed/metrics.go
+++ b/pkg/kv/kvserver/rangefeed/metrics.go
@@ -52,6 +52,13 @@ var (
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
+ metaRangeFeedSlowClosedTimestampCancelledRanges = metric.Metadata{
+ Name: "kv.rangefeed.closed_timestamp.slow_ranges.cancelled",
+ Help: "Number of rangefeeds that were cancelled due to a chronically " +
+ "lagging closed timestamp",
+ Measurement: "Cancellation Count",
+ Unit: metric.Unit_COUNT,
+ }
metaRangeFeedProcessorsGO = metric.Metadata{
Name: "kv.rangefeed.processors_goroutine",
Help: "Number of active RangeFeed processors using goroutines",
@@ -86,14 +93,15 @@ var (
// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
- RangeFeedCatchUpScanNanos *metric.Counter
- RangeFeedBudgetExhausted *metric.Counter
- RangefeedProcessorQueueTimeout *metric.Counter
- RangeFeedBudgetBlocked *metric.Counter
- RangeFeedRegistrations *metric.Gauge
- RangeFeedClosedTimestampMaxBehindNanos *metric.Gauge
- RangeFeedSlowClosedTimestampRanges *metric.Gauge
- RangeFeedSlowClosedTimestampLogN log.EveryN
+ RangeFeedCatchUpScanNanos *metric.Counter
+ RangeFeedBudgetExhausted *metric.Counter
+ RangefeedProcessorQueueTimeout *metric.Counter
+ RangeFeedBudgetBlocked *metric.Counter
+ RangeFeedSlowClosedTimestampCancelledRanges *metric.Counter
+ RangeFeedRegistrations *metric.Gauge
+ RangeFeedClosedTimestampMaxBehindNanos *metric.Gauge
+ RangeFeedSlowClosedTimestampRanges *metric.Gauge
+ RangeFeedSlowClosedTimestampLogN log.EveryN
// RangeFeedSlowClosedTimestampNudgeSem bounds the amount of work that can be
// spun up on behalf of the RangeFeed nudger. We don't expect to hit this
// limit, but it's here to limit the effect on stability in case something
@@ -112,17 +120,18 @@ func (*Metrics) MetricStruct() {}
// NewMetrics makes the metrics for RangeFeeds monitoring.
func NewMetrics() *Metrics {
return &Metrics{
- RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
- RangefeedProcessorQueueTimeout: metric.NewCounter(metaQueueTimeout),
- RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted),
- RangeFeedBudgetBlocked: metric.NewCounter(metaRangeFeedBudgetBlocked),
- RangeFeedRegistrations: metric.NewGauge(metaRangeFeedRegistrations),
- RangeFeedClosedTimestampMaxBehindNanos: metric.NewGauge(metaRangeFeedClosedTimestampMaxBehindNanos),
- RangeFeedSlowClosedTimestampRanges: metric.NewGauge(metaRangefeedSlowClosedTimestampRanges),
- RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second),
- RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
- RangeFeedProcessorsGO: metric.NewGauge(metaRangeFeedProcessorsGO),
- RangeFeedProcessorsScheduler: metric.NewGauge(metaRangeFeedProcessorsScheduler),
+ RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
+ RangefeedProcessorQueueTimeout: metric.NewCounter(metaQueueTimeout),
+ RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted),
+ RangeFeedBudgetBlocked: metric.NewCounter(metaRangeFeedBudgetBlocked),
+ RangeFeedSlowClosedTimestampCancelledRanges: metric.NewCounter(metaRangeFeedSlowClosedTimestampCancelledRanges),
+ RangeFeedRegistrations: metric.NewGauge(metaRangeFeedRegistrations),
+ RangeFeedClosedTimestampMaxBehindNanos: metric.NewGauge(metaRangeFeedClosedTimestampMaxBehindNanos),
+ RangeFeedSlowClosedTimestampRanges: metric.NewGauge(metaRangefeedSlowClosedTimestampRanges),
+ RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second),
+ RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
+ RangeFeedProcessorsGO: metric.NewGauge(metaRangeFeedProcessorsGO),
+ RangeFeedProcessorsScheduler: metric.NewGauge(metaRangeFeedProcessorsScheduler),
}
}
diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go
index a54662c03f29..13b4c7005cea 100644
--- a/pkg/kv/kvserver/replica_rangefeed.go
+++ b/pkg/kv/kvserver/replica_rangefeed.go
@@ -923,6 +923,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(
log.Warningf(ctx,
`RangeFeed is too far behind, cancelling for replanning [%v]`, signal)
r.disconnectRangefeedWithReason(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)
+ r.store.metrics.RangeFeedMetrics.RangeFeedSlowClosedTimestampCancelledRanges.Inc(1)
}
return nil, nil
})
diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed
index 8b436e55593b..08ab45b70450 100644
--- a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed
+++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed
@@ -12,6 +12,18 @@ echo
-- allows the rangefeed request to be re-routed to another replica.
+-- Observe the server-side rangefeed cancellation metric on n3, before a send
+-- queue develops, it should be zero:
+SELECT
+ name,
+ value
+ FROM crdb_internal.node_metrics
+ WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled'
+ ORDER BY name ASC;
+
+ kv.rangefeed.closed_timestamp.slow_ranges.cancelled | 0
+
+
(Sending 1 MiB put request to develop a send queue)
@@ -66,6 +78,17 @@ SELECT store_id,
(Rangefeed moved to n1)
+-- Observe the server-side rangefeed cancellation metric increased on n3:
+SELECT
+ name,
+ value
+ FROM crdb_internal.node_metrics
+ WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled'
+ ORDER BY name ASC;
+
+ kv.rangefeed.closed_timestamp.slow_ranges.cancelled | 1
+
+
-- (Allowing below-raft admission to proceed on n3.)