From c449d359229bb3d7187d92a4bf42400fd7ebee95 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 18 Dec 2024 10:11:33 -0500 Subject: [PATCH] kvserver: add metric for rangefeed cancellations due to lag Add a new counter metric, `kv.rangefeed.closed_timestamp.slow_ranges.cancelled`, which is incremented each time a rangefeed is cancelled server-side due to a chronically lagging closed timestamp (see #137531). Part of: #136214 Release note: None --- docs/generated/metrics/metrics.html | 1 + .../kvserver/flow_control_integration_test.go | 22 +++++++++ pkg/kv/kvserver/rangefeed/metrics.go | 47 +++++++++++-------- pkg/kv/kvserver/replica_rangefeed.go | 1 + .../send_queue_range_feed | 23 +++++++++ 5 files changed, 75 insertions(+), 19 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 46e068ec1cdc..2f1cf8e0fbb7 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -244,6 +244,7 @@ STORAGEkv.rangefeed.budget_allocation_failedNumber of times RangeFeed failed because memory budget was exceededEventsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEkv.rangefeed.catchup_scan_nanosTime spent in RangeFeed catchup scanNanosecondsCOUNTERNANOSECONDSAVGNON_NEGATIVE_DERIVATIVE STORAGEkv.rangefeed.closed_timestamp.slow_rangesNumber of ranges that have a closed timestamp lagging by more than 5x target lag. Periodically re-calculatedRangesGAUGECOUNTAVGNONE +STORAGEkv.rangefeed.closed_timestamp.slow_ranges.cancelledNumber of rangefeeds that were cancelled due to a chronically lagging closed timestampCancellation CountCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEkv.rangefeed.closed_timestamp_max_behind_nanosLargest latency between realtime and replica max closed timestamp for replicas that have active rangeeds on themNanosecondsGAUGENANOSECONDSAVGNONE STORAGEkv.rangefeed.mem_sharedMemory usage by rangefeedsMemoryGAUGEBYTESAVGNONE STORAGEkv.rangefeed.mem_systemMemory usage by rangefeeds on system rangesMemoryGAUGEBYTESAVGNONE diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 67e45a400840..966ff7ee819f 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -6277,6 +6277,7 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) { require.NoError(t, err) h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + n3 := sqlutils.MakeSQLRunner(tc.ServerConn(2)) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) h.resetV2TokenMetrics(ctx) h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) @@ -6311,6 +6312,18 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) { }) } + // We will use this metric to observe the server-side rangefeed cancellation, + // which should be zero prior to the send queue developing. Then, non-zero + // shortly after. + const rangeFeedCancelMetricQueryStr = ` + SELECT + name, + value + FROM crdb_internal.node_metrics + WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled' + ORDER BY name ASC; +` + closeFeed := rangeFeed( ctx, ts.DistSenderI(), @@ -6341,6 +6354,11 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) { h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + h.comment(` +-- Observe the server-side rangefeed cancellation metric on n3, before a send +-- queue develops, it should be zero:`) + h.query(n3, rangeFeedCancelMetricQueryStr) + h.comment(`(Sending 1 MiB put request to develop a send queue)`) h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) h.comment(`(Sent 1 MiB put request)`) @@ -6370,6 +6388,10 @@ func TestFlowControlSendQueueRangeFeed(t *testing.T) { newNode := curRangeFeedNodeID.Load().(roachpb.NodeID) h.comment(fmt.Sprintf(`(Rangefeed moved to n%v)`, newNode)) + h.comment(` +-- Observe the server-side rangefeed cancellation metric increased on n3:`) + h.query(n3, rangeFeedCancelMetricQueryStr) + h.comment(`-- (Allowing below-raft admission to proceed on n3.)`) setTokenReturnEnabled(true /* enabled */, 2) h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) diff --git a/pkg/kv/kvserver/rangefeed/metrics.go b/pkg/kv/kvserver/rangefeed/metrics.go index a8d7b2912f87..353e487aa746 100644 --- a/pkg/kv/kvserver/rangefeed/metrics.go +++ b/pkg/kv/kvserver/rangefeed/metrics.go @@ -52,6 +52,13 @@ var ( Measurement: "Ranges", Unit: metric.Unit_COUNT, } + metaRangeFeedSlowClosedTimestampCancelledRanges = metric.Metadata{ + Name: "kv.rangefeed.closed_timestamp.slow_ranges.cancelled", + Help: "Number of rangefeeds that were cancelled due to a chronically " + + "lagging closed timestamp", + Measurement: "Cancellation Count", + Unit: metric.Unit_COUNT, + } metaRangeFeedProcessorsGO = metric.Metadata{ Name: "kv.rangefeed.processors_goroutine", Help: "Number of active RangeFeed processors using goroutines", @@ -86,14 +93,15 @@ var ( // Metrics are for production monitoring of RangeFeeds. type Metrics struct { - RangeFeedCatchUpScanNanos *metric.Counter - RangeFeedBudgetExhausted *metric.Counter - RangefeedProcessorQueueTimeout *metric.Counter - RangeFeedBudgetBlocked *metric.Counter - RangeFeedRegistrations *metric.Gauge - RangeFeedClosedTimestampMaxBehindNanos *metric.Gauge - RangeFeedSlowClosedTimestampRanges *metric.Gauge - RangeFeedSlowClosedTimestampLogN log.EveryN + RangeFeedCatchUpScanNanos *metric.Counter + RangeFeedBudgetExhausted *metric.Counter + RangefeedProcessorQueueTimeout *metric.Counter + RangeFeedBudgetBlocked *metric.Counter + RangeFeedSlowClosedTimestampCancelledRanges *metric.Counter + RangeFeedRegistrations *metric.Gauge + RangeFeedClosedTimestampMaxBehindNanos *metric.Gauge + RangeFeedSlowClosedTimestampRanges *metric.Gauge + RangeFeedSlowClosedTimestampLogN log.EveryN // RangeFeedSlowClosedTimestampNudgeSem bounds the amount of work that can be // spun up on behalf of the RangeFeed nudger. We don't expect to hit this // limit, but it's here to limit the effect on stability in case something @@ -112,17 +120,18 @@ func (*Metrics) MetricStruct() {} // NewMetrics makes the metrics for RangeFeeds monitoring. func NewMetrics() *Metrics { return &Metrics{ - RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos), - RangefeedProcessorQueueTimeout: metric.NewCounter(metaQueueTimeout), - RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted), - RangeFeedBudgetBlocked: metric.NewCounter(metaRangeFeedBudgetBlocked), - RangeFeedRegistrations: metric.NewGauge(metaRangeFeedRegistrations), - RangeFeedClosedTimestampMaxBehindNanos: metric.NewGauge(metaRangeFeedClosedTimestampMaxBehindNanos), - RangeFeedSlowClosedTimestampRanges: metric.NewGauge(metaRangefeedSlowClosedTimestampRanges), - RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second), - RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024), - RangeFeedProcessorsGO: metric.NewGauge(metaRangeFeedProcessorsGO), - RangeFeedProcessorsScheduler: metric.NewGauge(metaRangeFeedProcessorsScheduler), + RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos), + RangefeedProcessorQueueTimeout: metric.NewCounter(metaQueueTimeout), + RangeFeedBudgetExhausted: metric.NewCounter(metaRangeFeedExhausted), + RangeFeedBudgetBlocked: metric.NewCounter(metaRangeFeedBudgetBlocked), + RangeFeedSlowClosedTimestampCancelledRanges: metric.NewCounter(metaRangeFeedSlowClosedTimestampCancelledRanges), + RangeFeedRegistrations: metric.NewGauge(metaRangeFeedRegistrations), + RangeFeedClosedTimestampMaxBehindNanos: metric.NewGauge(metaRangeFeedClosedTimestampMaxBehindNanos), + RangeFeedSlowClosedTimestampRanges: metric.NewGauge(metaRangefeedSlowClosedTimestampRanges), + RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second), + RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024), + RangeFeedProcessorsGO: metric.NewGauge(metaRangeFeedProcessorsGO), + RangeFeedProcessorsScheduler: metric.NewGauge(metaRangeFeedProcessorsScheduler), } } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index a54662c03f29..13b4c7005cea 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -923,6 +923,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked( log.Warningf(ctx, `RangeFeed is too far behind, cancelling for replanning [%v]`, signal) r.disconnectRangefeedWithReason(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED) + r.store.metrics.RangeFeedMetrics.RangeFeedSlowClosedTimestampCancelledRanges.Inc(1) } return nil, nil }) diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed index 8b436e55593b..08ab45b70450 100644 --- a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed @@ -12,6 +12,18 @@ echo -- allows the rangefeed request to be re-routed to another replica. +-- Observe the server-side rangefeed cancellation metric on n3, before a send +-- queue develops, it should be zero: +SELECT + name, + value + FROM crdb_internal.node_metrics + WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled' + ORDER BY name ASC; + + kv.rangefeed.closed_timestamp.slow_ranges.cancelled | 0 + + (Sending 1 MiB put request to develop a send queue) @@ -66,6 +78,17 @@ SELECT store_id, (Rangefeed moved to n1) +-- Observe the server-side rangefeed cancellation metric increased on n3: +SELECT + name, + value + FROM crdb_internal.node_metrics + WHERE name LIKE 'kv.rangefeed.closed_timestamp.slow_ranges.cancelled' + ORDER BY name ASC; + + kv.rangefeed.closed_timestamp.slow_ranges.cancelled | 1 + + -- (Allowing below-raft admission to proceed on n3.)