diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 5ab48bd3212d..9660618710f7 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -199,6 +199,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/caller", + "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/errorutil", "//pkg/util/grpcutil", @@ -212,6 +213,7 @@ go_test( "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/shuffle", + "//pkg/util/span", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 41c65090757a..81dd7f82b930 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -244,33 +244,34 @@ type DistSenderMetrics struct { InLeaseTransferBackoffs *metric.Counter RangeLookups *metric.Counter SlowRPCs *metric.Gauge - RangefeedRanges *metric.Gauge - RangefeedCatchupRanges *metric.Gauge - RangefeedErrorCatchup *metric.Counter - RangefeedRestartRanges *metric.Counter - RangefeedRestartStuck *metric.Counter MethodCounts [kvpb.NumMethods]*metric.Counter ErrCounts [kvpb.NumErrors]*metric.Counter + DistSenderRangeFeedMetrics +} + +// DistSenderRangeFeedMetrics is a set of rangefeed specific metrics. +type DistSenderRangeFeedMetrics struct { + RangefeedRanges *metric.Gauge + RangefeedCatchupRanges *metric.Gauge + RangefeedErrorCatchup *metric.Counter + RangefeedRestartRanges *metric.Counter + RangefeedRestartStuck *metric.Counter } func makeDistSenderMetrics() DistSenderMetrics { m := DistSenderMetrics{ - BatchCount: metric.NewCounter(metaDistSenderBatchCount), - PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), - AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), - AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), - SentCount: metric.NewCounter(metaTransportSentCount), - LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), - NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), - NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), - InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), - RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), - SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), - RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), - RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), - RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), - RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), - RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + BatchCount: metric.NewCounter(metaDistSenderBatchCount), + PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), + AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), + AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), + SentCount: metric.NewCounter(metaTransportSentCount), + LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), + NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), + NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), + InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), + RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), + SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), + DistSenderRangeFeedMetrics: makeDistSenderRangeFeedMetrics(), } for i := range m.MethodCounts { method := kvpb.Method(i).String() @@ -289,6 +290,19 @@ func makeDistSenderMetrics() DistSenderMetrics { return m } +func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics { + return DistSenderRangeFeedMetrics{ + RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), + RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), + RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), + RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), + RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + } +} + +// MetricStruct implements metrics.Struct +func (m DistSenderRangeFeedMetrics) MetricStruct() {} + // FirstRangeProvider is capable of providing DistSender with the descriptor of // the first range in the cluster and notifying the DistSender when the first // range in the cluster has changed. diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index dbe3964b6f13..eaf3e74e7f48 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -44,6 +44,7 @@ type rangefeedMuxer struct { g ctxgroup.Group ds *DistSender + metrics *DistSenderRangeFeedMetrics cfg rangeFeedConfig registry *rangeFeedRegistry catchupSem *limit.ConcurrentRequestLimiter @@ -84,9 +85,13 @@ func muxRangeFeed( registry: rr, ds: ds, cfg: cfg, + metrics: &ds.metrics.DistSenderRangeFeedMetrics, catchupSem: catchupSem, eventCh: eventCh, } + if cfg.knobs.metrics != nil { + m.metrics = cfg.knobs.metrics + } divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g) return errors.CombineErrors(m.g.Wait(), ctx.Err()) @@ -158,7 +163,7 @@ type activeMuxRangeFeed struct { roachpb.ReplicaDescriptor startAfter hlc.Timestamp - // cathchupRes is the catchup scan quota acquired upon the + // catchupRes is the catchup scan quota acquired upon the // start of rangefeed. // It is released when this stream receives first non-empty checkpoint // (meaning: catchup scan completes). @@ -211,7 +216,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed( // Register active mux range feed. stream := &activeMuxRangeFeed{ - activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.ds.metrics.RangefeedRanges), + activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics.RangefeedRanges), rSpan: rs, startAfter: startAfter, token: token, @@ -241,7 +246,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error { // Before starting single rangefeed, acquire catchup scan quota. - catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem) + catchupRes, err := acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics) if err != nil { return err } @@ -387,13 +392,19 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( recvErr = nil } + toRestart := ms.close() + // make sure that the underlying error is not fatal. If it is, there is no // reason to restart each rangefeed, so just bail out. if _, err := handleRangefeedError(ctx, recvErr); err != nil { + // Regardless of an error, release any resources (i.e. metrics) still + // being held by active stream. + for _, s := range toRestart { + s.release() + } return err } - toRestart := ms.close() if log.V(1) { log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart)) } @@ -429,8 +440,14 @@ func (m *rangefeedMuxer) receiveEventsFromNode( continue } - if m.cfg.knobs.onMuxRangefeedEvent != nil { - m.cfg.knobs.onMuxRangefeedEvent(event) + if m.cfg.knobs.onRangefeedEvent != nil { + skip, err := m.cfg.knobs.onRangefeedEvent(ctx, active.Span, &event.RangeFeedEvent) + if err != nil { + return err + } + if skip { + continue + } } switch t := event.GetValue().(type) { @@ -451,7 +468,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) if active.catchupRes != nil { - m.ds.metrics.RangefeedErrorCatchup.Inc(1) + m.metrics.RangefeedErrorCatchup.Inc(1) } ms.deleteStream(event.StreamID) // Restart rangefeed on another goroutine. Restart might be a bit @@ -473,7 +490,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( } } -// restarActiveRangeFeeds restarts one or more rangefeeds. +// restartActiveRangeFeeds restarts one or more rangefeeds. func (m *rangefeedMuxer) restartActiveRangeFeeds( ctx context.Context, reason error, toRestart []*activeMuxRangeFeed, ) error { @@ -489,13 +506,7 @@ func (m *rangefeedMuxer) restartActiveRangeFeeds( func (m *rangefeedMuxer) restartActiveRangeFeed( ctx context.Context, active *activeMuxRangeFeed, reason error, ) error { - m.ds.metrics.RangefeedRestartRanges.Inc(1) - - if log.V(1) { - log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v", - active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor, - timeutil.Since(active.Resolved.GoTime()), reason) - } + m.metrics.RangefeedRestartRanges.Inc(1) active.setLastError(reason) // Release catchup scan reservation if any -- we will acquire another @@ -518,6 +529,12 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( return err } + if log.V(1) { + log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v (errInfo %v)", + active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor, + timeutil.Since(active.Resolved.GoTime()), reason, errInfo) + } + if errInfo.evict { active.resetRouting(ctx, rangecache.EvictionToken{}) } @@ -587,13 +604,3 @@ func (c *muxStream) close() []*activeMuxRangeFeed { return toRestart } - -// a test only option to modify mux rangefeed event. -func withOnMuxEvent(fn func(event *kvpb.MuxRangeFeedEvent)) RangeFeedOption { - return optionFunc(func(c *rangeFeedConfig) { - c.knobs.onMuxRangefeedEvent = fn - }) -} - -// TestingWithOnMuxEvent allow external tests access to the withOnMuxEvent option. -var TestingWithOnMuxEvent = withOnMuxEvent diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 3a3eca031848..e601eea6c08a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -93,7 +93,12 @@ type rangeFeedConfig struct { withDiff bool knobs struct { - onMuxRangefeedEvent func(event *kvpb.MuxRangeFeedEvent) + // onRangefeedEvent invoked on each rangefeed event. + // Returns boolean indicating if event should be skipped or an error + // indicating if rangefeed should terminate. + onRangefeedEvent func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) + // metrics overrides rangefeed metrics to use. + metrics *DistSenderRangeFeedMetrics } } @@ -189,7 +194,10 @@ func (ds *DistSender) RangeFeedSpans( for _, opt := range opts { opt.set(&cfg) } - + metrics := &ds.metrics.DistSenderRangeFeedMetrics + if cfg.knobs.metrics != nil { + metrics = cfg.knobs.metrics + } ctx = ds.AnnotateCtx(ctx) ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender") defer sp.Finish() @@ -217,7 +225,7 @@ func (ds *DistSender) RangeFeedSpans( // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, - sri.token, &catchupSem, rangeCh, eventCh, cfg) + sri.token, &catchupSem, rangeCh, eventCh, cfg, metrics) }) case <-ctx.Done(): return ctx.Err() @@ -442,12 +450,13 @@ func (ds *DistSender) partialRangeFeed( rangeCh chan<- singleRangeInfo, eventCh chan<- RangeFeedMessage, cfg rangeFeedConfig, + metrics *DistSenderRangeFeedMetrics, ) error { // Bound the partial rangefeed to the partial span. span := rs.AsRawSpanWithNoLocals() // Register partial range feed with registry. - active := newActiveRangeFeed(span, startAfter, rr, ds.metrics.RangefeedRanges) + active := newActiveRangeFeed(span, startAfter, rr, metrics.RangefeedRanges) defer active.release() // Start a retry loop for sending the batch to the range. @@ -473,7 +482,7 @@ func (ds *DistSender) partialRangeFeed( maxTS, err := ds.singleRangeFeed( ctx, span, startAfter, token.Desc(), - catchupSem, eventCh, active.onRangeEvent, cfg) + catchupSem, eventCh, active.onRangeEvent, cfg, metrics) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) @@ -488,7 +497,7 @@ func (ds *DistSender) partialRangeFeed( if err != nil { return err } - ds.metrics.RangefeedRestartRanges.Inc(1) + metrics.RangefeedRestartRanges.Inc(1) if errInfo.evict { token.Evict(ctx) token = rangecache.EvictionToken{} @@ -570,18 +579,21 @@ func (a catchupAlloc) Release() { } func acquireCatchupScanQuota( - ctx context.Context, ds *DistSender, catchupSem *limit.ConcurrentRequestLimiter, + ctx context.Context, + sv *settings.Values, + catchupSem *limit.ConcurrentRequestLimiter, + metrics *DistSenderRangeFeedMetrics, ) (catchupAlloc, error) { // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take // opportunity to update semaphore limit. - catchupSem.SetLimit(maxConcurrentCatchupScans(&ds.st.SV)) + catchupSem.SetLimit(maxConcurrentCatchupScans(sv)) res, err := catchupSem.Begin(ctx) if err != nil { return nil, err } - ds.metrics.RangefeedCatchupRanges.Inc(1) + metrics.RangefeedCatchupRanges.Inc(1) return func() { - ds.metrics.RangefeedCatchupRanges.Dec(1) + metrics.RangefeedCatchupRanges.Dec(1) res.Release() }, nil } @@ -672,6 +684,7 @@ func (ds *DistSender) singleRangeFeed( eventCh chan<- RangeFeedMessage, onRangeEvent onRangeEventCb, cfg rangeFeedConfig, + metrics *DistSenderRangeFeedMetrics, ) (_ hlc.Timestamp, retErr error) { // Ensure context is cancelled on all errors, to prevent gRPC stream leaks. ctx, cancelFeed := context.WithCancel(ctx) @@ -691,7 +704,7 @@ func (ds *DistSender) singleRangeFeed( // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take // opportunity to update semaphore limit. - catchupRes, err := acquireCatchupScanQuota(ctx, ds, catchupSem) + catchupRes, err := acquireCatchupScanQuota(ctx, &ds.st.SV, catchupSem, metrics) if err != nil { return hlc.Timestamp{}, err } @@ -760,11 +773,21 @@ func (ds *DistSender) singleRangeFeed( log.VErrEventf(ctx, 2, "RPC error: %s", err) if stuckWatcher.stuck() { afterCatchUpScan := catchupRes == nil - return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold()) + return args.Timestamp, handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold(), metrics) } return args.Timestamp, err } + if cfg.knobs.onRangefeedEvent != nil { + skip, err := cfg.knobs.onRangefeedEvent(ctx, span, event) + if err != nil { + return args.Timestamp, err + } + if skip { + continue + } + } + msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span} switch t := event.GetValue().(type) { case *kvpb.RangeFeedCheckpoint: @@ -783,14 +806,14 @@ func (ds *DistSender) singleRangeFeed( case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) if catchupRes != nil { - ds.metrics.RangefeedErrorCatchup.Inc(1) + metrics.RangefeedErrorCatchup.Inc(1) } if stuckWatcher.stuck() { // When the stuck watcher fired, and the rangefeed call is local, // the remote might notice the cancellation first and return from // Recv with an error that we need to special-case here. afterCatchUpScan := catchupRes == nil - return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold()) + return args.Timestamp, handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold(), metrics) } return args.Timestamp, t.Error.GoError() } @@ -812,10 +835,13 @@ func connectionClass(sv *settings.Values) rpc.ConnectionClass { return rpc.DefaultClass } -func (ds *DistSender) handleStuckEvent( - args *kvpb.RangeFeedRequest, afterCatchupScan bool, threshold time.Duration, +func handleStuckEvent( + args *kvpb.RangeFeedRequest, + afterCatchupScan bool, + threshold time.Duration, + m *DistSenderRangeFeedMetrics, ) error { - ds.metrics.RangefeedRestartStuck.Inc(1) + m.RangefeedRestartStuck.Inc(1) if afterCatchupScan { telemetry.Count("rangefeed.stuck.after-catchup-scan") } else { @@ -826,3 +852,23 @@ func (ds *DistSender) handleStuckEvent( // sentinel error returned when cancelling rangefeed when it is stuck. var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity") + +// TestingWithOnRangefeedEvent returns a test only option to modify rangefeed event. +func TestingWithOnRangefeedEvent( + fn func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error), +) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.knobs.onRangefeedEvent = fn + }) +} + +// TestingWithRangeFeedMetrics returns a test only option to specify metrics to +// use while executing this rangefeed. +func TestingWithRangeFeedMetrics(m *DistSenderRangeFeedMetrics) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.knobs.metrics = m + }) +} + +// TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use. +var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index ef137233ae30..f967a97521a3 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -136,17 +136,17 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { stream.EXPECT().Send(gomock.Any()).Return(nil) stream.EXPECT().Recv().Do(func() { cancel() - }).Return(nil, context.Canceled) - client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil) + }).Return(nil, context.Canceled).AnyTimes() + client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil).AnyTimes() } else { stream := kvpbmock.NewMockInternal_RangeFeedClient(ctrl) stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) } - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) - transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil) + transport.EXPECT().IsExhausted().Return(false).AnyTimes() + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]).AnyTimes() + transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil).AnyTimes() transport.EXPECT().Release().AnyTimes() } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index abdf0c29d79e..0c09ea4a5ec8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -32,10 +32,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -591,7 +593,9 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) defer tc.Stopper().Stop(ctx) ts := tc.Server(0) @@ -624,12 +628,177 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) { var numErrors atomic.Int32 enoughErrors := make(chan struct{}) closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, noValuesExpected, true, - kvcoord.TestingWithOnMuxEvent(func(event *kvpb.MuxRangeFeedEvent) { - event.RangeFeedEvent = transientErrEvent - if numErrors.Add(1) == numErrsToReturn { - close(enoughErrors) - } - })) + kvcoord.TestingWithOnRangefeedEvent( + func(_ context.Context, _ roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) { + *event = transientErrEvent + if numErrors.Add(1) == numErrsToReturn { + close(enoughErrors) + } + return false, nil + })) channelWaitWithTimeout(t, enoughErrors) closeFeed() } + +// Test to make sure the various metrics used by rangefeed are correctly +// updated during the lifetime of the rangefeed and when the rangefeed completes. +func TestRangeFeedMetricsManagement(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + // Insert 1000 rows, and split them into 10 ranges. + const numRanges = 10 + sqlDB.ExecMultiple(t, + `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))`, + ) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + testutils.RunTrueAndFalse(t, "mux", func(t *testing.T, useMux bool) { + metrics := kvcoord.TestingMakeRangeFeedMetrics() + + // Number of ranges for which we'll issue transient error. + const numRangesToRetry int64 = 3 + // Number of ranges which we will block from completion. + const numCatchupToBlock int64 = 2 + + // Upon shutdown, make sure the metrics have correct values. + defer func() { + require.EqualValues(t, 0, metrics.RangefeedRanges.Value()) + require.EqualValues(t, 0, metrics.RangefeedRestartStuck.Count()) + + // We injected numRangesToRetry transient errors during catchup scan. + // It is possible however, that we will observe key-mismatch error when restarting + // due to how we split the ranges above (i.e. there is a version of the range + // that goes from e.g. 800-Max, and then there is correct version 800-900). + // When iterating through the entire table span, we pick up correct version. + // However, if we attempt to re-resolve single range, we may get incorrect/old + // version that was cached. Thus, we occasionally see additional transient restarts. + require.GreaterOrEqual(t, metrics.RangefeedErrorCatchup.Count(), numRangesToRetry) + require.GreaterOrEqual(t, metrics.RangefeedRestartRanges.Count(), numRangesToRetry) + + // Even though numCatchupToBlock ranges were blocked in the catchup scan phase, + // the counter should be 0 once rangefeed is done. + require.EqualValues(t, 0, metrics.RangefeedCatchupRanges.Value()) + }() + + frontier, err := span.MakeFrontier(fooSpan) + require.NoError(t, err) + + // This error causes rangefeed to restart. + transientErrEvent := kvpb.RangeFeedEvent{ + Error: &kvpb.RangeFeedError{Error: *kvpb.NewError(&kvpb.StoreNotFoundError{})}, + } + + var numRetried atomic.Int64 + var numCatchupBlocked atomic.Int64 + skipSet := struct { + syncutil.Mutex + stuck roachpb.SpanGroup // Spans that are stuck in catchup scan. + retry roachpb.SpanGroup // Spans we issued retry for. + }{} + const kindRetry = true + const kindStuck = false + shouldSkip := func(k roachpb.Key, kind bool) bool { + skipSet.Lock() + defer skipSet.Unlock() + if kind == kindRetry { + return skipSet.retry.Contains(k) + } + return skipSet.stuck.Contains(k) + } + + ignoreValues := func(event kvcoord.RangeFeedMessage) {} + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, ignoreValues, useMux, + kvcoord.TestingWithRangeFeedMetrics(&metrics), + kvcoord.TestingWithOnRangefeedEvent( + func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) { + switch t := event.GetValue().(type) { + case *kvpb.RangeFeedValue: + // If we previously arranged for the range to be skipped (stuck catchup scan), + // then skip any value that belongs to the skipped range. + // This is only needed for mux rangefeed, since regular rangefeed just blocks. + return useMux && shouldSkip(t.Key, kindStuck), nil + case *kvpb.RangeFeedCheckpoint: + if checkpoint := t; checkpoint.Span.Contains(s) { + if checkpoint.ResolvedTS.IsEmpty() { + return false, nil + } + + // Skip any subsequent checkpoint if we previously arranged for + // range to be skipped. + if useMux && shouldSkip(checkpoint.Span.Key, kindStuck) { + return true, nil + } + + if !shouldSkip(checkpoint.Span.Key, kindRetry) && numRetried.Add(1) <= numRangesToRetry { + // Return transient error for this range, but do this only once per range. + skipSet.Lock() + skipSet.retry.Add(checkpoint.Span) + skipSet.Unlock() + log.Infof(ctx, "skipping span %s", checkpoint.Span) + *event = transientErrEvent + return false, nil + } + + _, err := frontier.Forward(checkpoint.Span, checkpoint.ResolvedTS) + if err != nil { + return false, err + } + + if numCatchupBlocked.Add(1) <= numCatchupToBlock { + if useMux { + // Mux rangefeed can't block single range, so just skip this event + // and arrange for other events belonging to this range to be skipped as well. + skipSet.Lock() + skipSet.stuck.Add(checkpoint.Span) + skipSet.Unlock() + log.Infof(ctx, "skipping stuck span %s", checkpoint.Span) + return true /* skip */, nil + } + + // Regular rangefeed can block to prevent catchup completion until rangefeed is canceled. + return false, contextutil.RunWithTimeout(ctx, "wait-rf-timeout", time.Minute, + func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + }) + } + } + } + + return false, nil + })) + defer closeFeed() + + // Wait for the test frontier to advance. Once it advances, + // we know the rangefeed is started, all ranges are running (even if some of them are blocked). + testutils.SucceedsWithin(t, func() error { + if frontier.Frontier().IsEmpty() { + return errors.Newf("waiting for frontier advance: %s", frontier.String()) + } + return nil + }, 10*time.Second) + + // At this point, we know the rangefeed for all ranges are running. + require.EqualValues(t, numRanges, metrics.RangefeedRanges.Value(), frontier.String()) + + // We also know that we have blocked numCatchupToBlock ranges in their catchup scan. + require.EqualValues(t, numCatchupToBlock, metrics.RangefeedCatchupRanges.Value()) + }) +}