diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 58bf3dd0b93a..0f9fe7bf8f49 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -202,6 +202,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 8942f8cd60ef..0bf733b2da42 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -394,30 +394,50 @@ func (ca *changeAggregator) makeKVFeedCfg( initialHighWater, &ca.metrics.SchemaFeedMetrics, config.Opts.GetCanHandle()) } + monitoringCfg, err := makeKVFeedMonitoringCfg(ca.sliMetrics, ca.flowCtx.Cfg.Settings) + if err != nil { + return kvfeed.Config{}, err + } + return kvfeed.Config{ - Writer: buf, - Settings: cfg.Settings, - DB: cfg.DB, - Codec: cfg.Codec, - Clock: cfg.DB.Clock(), - Gossip: cfg.Gossip, - Spans: spans, - CheckpointSpans: ca.spec.Checkpoint.Spans, - CheckpointTimestamp: ca.spec.Checkpoint.Timestamp, - Targets: AllTargets(ca.spec.Feed), - Metrics: &ca.metrics.KVFeedMetrics, - OnBackfillCallback: ca.sliMetrics.getBackfillCallback(), - OnBackfillRangeCallback: ca.sliMetrics.getBackfillRangeCallback(), - MM: ca.kvFeedMemMon, - InitialHighWater: initialHighWater, - EndTime: config.EndTime, - WithDiff: filters.WithDiff, - NeedsInitialScan: needsInitialScan, - SchemaChangeEvents: schemaChange.EventClass, - SchemaChangePolicy: schemaChange.Policy, - SchemaFeed: sf, - Knobs: ca.knobs.FeedKnobs, - UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV), + Writer: buf, + Settings: cfg.Settings, + DB: cfg.DB, + Codec: cfg.Codec, + Clock: cfg.DB.Clock(), + Gossip: cfg.Gossip, + Spans: spans, + CheckpointSpans: ca.spec.Checkpoint.Spans, + CheckpointTimestamp: ca.spec.Checkpoint.Timestamp, + Targets: AllTargets(ca.spec.Feed), + Metrics: &ca.metrics.KVFeedMetrics, + MM: ca.kvFeedMemMon, + InitialHighWater: initialHighWater, + EndTime: config.EndTime, + WithDiff: filters.WithDiff, + NeedsInitialScan: needsInitialScan, + SchemaChangeEvents: schemaChange.EventClass, + SchemaChangePolicy: schemaChange.Policy, + SchemaFeed: sf, + Knobs: ca.knobs.FeedKnobs, + UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV), + MonitoringCfg: monitoringCfg, + }, nil +} + +func makeKVFeedMonitoringCfg( + sliMetrics *sliMetrics, settings *cluster.Settings, +) (kvfeed.MonitoringConfig, error) { + laggingRangesThreshold := changefeedbase.LaggingRangesThreshold.Get(&settings.SV) + laggingRangesInterval := changefeedbase.LaggingRangesPollingInterval.Get(&settings.SV) + + return kvfeed.MonitoringConfig{ + LaggingRangesCallback: sliMetrics.getLaggingRangesCallback(), + LaggingRangesThreshold: laggingRangesThreshold, + LaggingRangesPollingInterval: laggingRangesInterval, + + OnBackfillCallback: sliMetrics.getBackfillCallback(), + OnBackfillRangeCallback: sliMetrics.getBackfillRangeCallback(), }, nil } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 1a08f4c6e8bd..cafa223c150c 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -1024,6 +1025,123 @@ func TestChangefeedInitialScan(t *testing.T) { cdcTest(t, testFn) } +// TestChangefeedLaggingRangesMetrics tests the behavior of the +// changefeed.lagging_ranges metric. +func TestChangefeedLaggingRangesMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + // Ensure a fast closed timestamp interval so ranges can catch up fast. + kvserver.RangeFeedRefreshInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + closedts.SideTransportCloseInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + closedts.TargetDuration.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + + changefeedbase.LaggingRangesPollingInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, 25*time.Millisecond) + changefeedbase.LaggingRangesThreshold.Override( + context.Background(), &s.Server.ClusterSettings().SV, 250*time.Millisecond) + + skipMu := syncutil.Mutex{} + skippedRanges := map[string]struct{}{} + numRanges := 10 + numRangesToSkip := int64(4) + var stopSkip atomic.Bool + // `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees. + // skipping is disabled by setting `stopSkip` to true. + shouldSkip := func(event *roachpb.RangeFeedEvent) bool { + if stopSkip.Load() { + return false + } + switch event.GetValue().(type) { + case *roachpb.RangeFeedCheckpoint: + sp := event.Checkpoint.Span + skipMu.Lock() + defer skipMu.Unlock() + if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip { + skippedRanges[sp.String()] = struct{}{} + return true + } + } + return false + } + + knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + + knobs.FeedKnobs.RangefeedOptions = append(knobs.FeedKnobs.RangefeedOptions, kvcoord.TestingWithOnRangefeedEvent( + func(ctx context.Context, s roachpb.Span, event *roachpb.RangeFeedEvent) (skip bool, _ error) { + return shouldSkip(event), nil + }), + ) + + registry := s.Server.JobRegistry().(*jobs.Registry) + sli1, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t1") + require.NoError(t, err) + laggingRangesTier1 := sli1.LaggingRanges + sli2, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t2") + require.NoError(t, err) + laggingRangesTier2 := sli2.LaggingRanges + + assertLaggingRanges := func(tier string, expected int64) { + testutils.SucceedsWithin(t, func() error { + var laggingRangesObserved int64 + if tier == "t1" { + laggingRangesObserved = laggingRangesTier1.Value() + } else { + laggingRangesObserved = laggingRangesTier2.Value() + } + if laggingRangesObserved != expected { + return fmt.Errorf("expected %d lagging ranges, but found %d", expected, laggingRangesObserved) + } + return nil + }, 10*time.Second) + } + + sqlDB.Exec(t, fmt.Sprintf(` + CREATE TABLE foo (key INT PRIMARY KEY); + INSERT INTO foo (key) SELECT * FROM generate_series(1, %d); + ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d, 1)); + `, numRanges, numRanges-1)) + sqlDB.CheckQueryResults(t, `SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`, + [][]string{{fmt.Sprint(numRanges)}}, + ) + + feed1Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`) + feed2Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`) + feed3Tier2 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t2"`) + + assertLaggingRanges("t1", numRangesToSkip*2) + assertLaggingRanges("t2", numRangesToSkip) + + stopSkip.Store(true) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", 0) + + stopSkip.Store(false) + assertLaggingRanges("t1", numRangesToSkip*2) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed1Tier1.Close()) + assertLaggingRanges("t1", numRangesToSkip) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed2Tier1.Close()) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed3Tier2.Close()) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", 0) + } + // Can't run on tenants due to lack of SPLIT AT support (#54254) + cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks) +} + func TestChangefeedBackfillObservability(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1241,6 +1359,8 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) { func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 292518490acd..df12624c28ed 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -507,6 +507,8 @@ func (s StatementOptions) getEnumValue(k string) (string, error) { return rawVal, nil } +// getDurationValue validates that the option `k` was supplied with a +// valid duration. func (s StatementOptions) getDurationValue(k string) (*time.Duration, error) { v, ok := s.m[k] if !ok { diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index cb3a12a36921..382e229023ca 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -279,3 +279,31 @@ var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting( "determines whether changefeed event processing integrates with elastic CPU control", false, ) + +var defaultLaggingRangesThreshold = 3 * time.Minute + +var defaultLaggingRangesPollingInterval = 1 * time.Minute + +// LaggingRangesThreshold specifies the duration by which a range must +// be lagging behind the present to be considered as 'lagging' behind in +// metrics. +var LaggingRangesThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.lagging_ranges_threshold", + "specifies the duration by which a range must be lagging behind the "+ + "present to be considered as 'lagging' behind in metrics. will be "+ + "removed in v23.2 in favor of a changefeed option", + defaultLaggingRangesThreshold, + settings.PositiveDuration, +) + +// LaggingRangesPollingInterval is the polling rate at which lagging ranges are +// checked and metrics are updated. +var LaggingRangesPollingInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.lagging_ranges_polling_interval", + "the polling rate at which lagging ranges are checked and "+ + "corresponding metrics are updated. will be removed in v23.2 onwards", + defaultLaggingRangesPollingInterval, + settings.PositiveDuration, +) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 2b2b3ccd9e4e..cc43810b485f 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -15,6 +15,7 @@ package kvfeed import ( "context" "fmt" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" @@ -31,29 +32,46 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) -// Config configures a kvfeed. -type Config struct { - Settings *cluster.Settings - DB *kv.DB - Codec keys.SQLCodec - Clock *hlc.Clock - Gossip gossip.OptionalGossip - Spans []roachpb.Span - CheckpointSpans []roachpb.Span - CheckpointTimestamp hlc.Timestamp - Targets changefeedbase.Targets - Writer kvevent.Writer - Metrics *kvevent.Metrics +// MonitoringConfig is a set of callbacks which the kvfeed calls to provide +// the caller with information about the state of the kvfeed. +type MonitoringConfig struct { + // LaggingRangesCallback is called periodically with the number of lagging ranges + // in the kvfeed. + LaggingRangesCallback func(int64) + // LaggingRangesPollingInterval is how often the kv feed will poll for + // lagging ranges. + LaggingRangesPollingInterval time.Duration + // LaggingRangesThreshold is how far behind a range must be to be considered + // lagging. + LaggingRangesThreshold time.Duration + OnBackfillCallback func() func() OnBackfillRangeCallback func(int64) (func(), func()) - MM *mon.BytesMonitor - WithDiff bool - SchemaChangeEvents changefeedbase.SchemaChangeEventClass - SchemaChangePolicy changefeedbase.SchemaChangePolicy - SchemaFeed schemafeed.SchemaFeed +} + +// Config configures a kvfeed. +type Config struct { + Settings *cluster.Settings + DB *kv.DB + Codec keys.SQLCodec + Clock *hlc.Clock + Gossip gossip.OptionalGossip + Spans []roachpb.Span + CheckpointSpans []roachpb.Span + CheckpointTimestamp hlc.Timestamp + Targets changefeedbase.Targets + Writer kvevent.Writer + Metrics *kvevent.Metrics + MonitoringCfg MonitoringConfig + MM *mon.BytesMonitor + WithDiff bool + SchemaChangeEvents changefeedbase.SchemaChangeEventClass + SchemaChangePolicy changefeedbase.SchemaChangePolicy + SchemaFeed schemafeed.SchemaFeed // If true, the feed will begin with a dump of data at exactly the // InitialHighWater. This is a peculiar behavior. In general the @@ -87,7 +105,7 @@ func Run(ctx context.Context, cfg Config) error { settings: cfg.Settings, gossip: cfg.Gossip, db: cfg.DB, - onBackfillRangeCallback: cfg.OnBackfillRangeCallback, + onBackfillRangeCallback: cfg.MonitoringCfg.OnBackfillRangeCallback, } } var pff physicalFeedFactory @@ -101,6 +119,7 @@ func Run(ctx context.Context, cfg Config) error { return kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics) } + g := ctxgroup.WithContext(ctx) f := newKVFeed( cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp, cfg.SchemaChangeEvents, cfg.SchemaChangePolicy, @@ -109,9 +128,10 @@ func Run(ctx context.Context, cfg Config) error { cfg.Codec, cfg.SchemaFeed, sc, pff, bf, cfg.UseMux, cfg.Knobs) - f.onBackfillCallback = cfg.OnBackfillCallback + f.onBackfillCallback = cfg.MonitoringCfg.OnBackfillCallback + f.rangeObserver = startLaggingRangesObserver(g, cfg.MonitoringCfg.LaggingRangesCallback, + cfg.MonitoringCfg.LaggingRangesPollingInterval, cfg.MonitoringCfg.LaggingRangesThreshold) - g := ctxgroup.WithContext(ctx) g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) err := g.Wait() @@ -155,6 +175,59 @@ func Run(ctx context.Context, cfg Config) error { return err } +func startLaggingRangesObserver( + g ctxgroup.Group, + updateLaggingRanges func(int64), + pollingInterval time.Duration, + threshold time.Duration, +) func(fn kvcoord.ForEachRangeFn) { + return func(fn kvcoord.ForEachRangeFn) { + g.GoCtx(func(ctx context.Context) error { + // Reset metrics on shutdown. + defer func() { + updateLaggingRanges(0) + }() + + var timer timeutil.Timer + defer timer.Stop() + timer.Reset(pollingInterval) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + timer.Read = true + + count := int64(0) + thresholdTS := timeutil.Now().Add(-1 * threshold) + err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { + // The resolved timestamp of a range determines the timestamp which is caught up to. + // However, during catchup scans, this is not set. For catchup scans, we consider the + // time the partial rangefeed was created to be its resolved ts. Note that a range can + // restart due to a range split, transient error etc. In these cases you also expect + // to see a `CreatedTime` but no resolved timestamp. + ts := feed.Resolved + if ts.IsEmpty() { + ts = hlc.Timestamp{WallTime: feed.CreatedTime.UnixNano()} + } + + if ts.Less(hlc.Timestamp{WallTime: thresholdTS.UnixNano()}) { + count += 1 + } + return nil + }) + if err != nil { + return err + } + updateLaggingRanges(count) + timer.Reset(pollingInterval) + } + } + }) + } +} + // schemaChangeDetectedError is a sentinel error to indicate to Run() that the // schema change is stopping due to a schema change. This is handy to trigger // the context group to stop; the error is handled entirely in this package. @@ -178,6 +251,7 @@ type kvFeed struct { codec keys.SQLCodec onBackfillCallback func() func() + rangeObserver func(fn kvcoord.ForEachRangeFn) schemaChangeEvents changefeedbase.SchemaChangeEventClass schemaChangePolicy changefeedbase.SchemaChangePolicy @@ -473,11 +547,12 @@ func (f *kvFeed) runUntilTableEvent( g := ctxgroup.WithContext(ctx) physicalCfg := rangeFeedConfig{ - Spans: stps, - Frontier: resumeFrontier.Frontier(), - WithDiff: f.withDiff, - Knobs: f.knobs, - UseMux: f.useMux, + Spans: stps, + Frontier: resumeFrontier.Frontier(), + WithDiff: f.withDiff, + Knobs: f.knobs, + UseMux: f.useMux, + RangeObserver: f.rangeObserver, } g.GoCtx(func(ctx context.Context) error { diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 8ed3f8feceb1..4a3e11c5628c 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -27,11 +27,12 @@ type physicalFeedFactory interface { } type rangeFeedConfig struct { - Frontier hlc.Timestamp - Spans []kvcoord.SpanTimePair - WithDiff bool - Knobs TestingKnobs - UseMux bool + Frontier hlc.Timestamp + Spans []kvcoord.SpanTimePair + WithDiff bool + RangeObserver func(fn kvcoord.ForEachRangeFn) + Knobs TestingKnobs + UseMux bool } type rangefeedFactory func( @@ -79,6 +80,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang if cfg.UseMux { rfOpts = append(rfOpts, kvcoord.WithMuxRangeFeed()) } + if cfg.RangeObserver != nil { + rfOpts = append(rfOpts, kvcoord.WithRangeObserver(cfg.RangeObserver)) + } + if len(cfg.Knobs.RangefeedOptions) != 0 { + rfOpts = append(rfOpts, cfg.Knobs.RangefeedOptions...) + } g.GoCtx(func(ctx context.Context) error { return p(ctx, cfg.Spans, cfg.WithDiff, feed.eventC, rfOpts...) diff --git a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go index dd87b86979f7..f9f6257532a5 100644 --- a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go +++ b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go @@ -29,6 +29,8 @@ type TestingKnobs struct { // EndTimeReached is a callback that may return true to indicate the // feed should exit because its end time has been reached. EndTimeReached func() bool + // RangefeedOptions lets the kvfeed override rangefeed settings. + RangefeedOptions []kvcoord.RangeFeedOption } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 8401bd51e114..cec7595cf8c6 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -66,6 +66,7 @@ type AggMetrics struct { SchemaRegistryRetries *aggmetric.AggCounter AggregatorProgress *aggmetric.AggGauge CheckpointProgress *aggmetric.AggGauge + LaggingRanges *aggmetric.AggGauge // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { @@ -123,6 +124,7 @@ type sliMetrics struct { SchemaRegistryRetries *aggmetric.Counter AggregatorProgress *aggmetric.Gauge CheckpointProgress *aggmetric.Gauge + LaggingRanges *aggmetric.Gauge mu struct { syncutil.Mutex @@ -563,6 +565,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Unix Timestamp Nanoseconds", Unit: metric.Unit_TIMESTAMP_NS, } + metaLaggingRangePercentage := metric.Metadata{ + Name: "changefeed.lagging_ranges", + Help: "The number of ranges considered to be lagging behind", + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -628,6 +636,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations), AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn), CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn), + LaggingRanges: b.Gauge(metaLaggingRangePercentage), } a.mu.sliMetrics = make(map[string]*sliMetrics) _, err := a.getOrCreateScope(defaultSLIScope) @@ -684,6 +693,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope), SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope), SchemaRegistrations: a.SchemaRegistrations.AddChild(scope), + LaggingRanges: a.LaggingRanges.AddChild(scope), } sm.mu.resolved = make(map[int64]hlc.Timestamp) sm.mu.checkpoint = make(map[int64]hlc.Timestamp) @@ -713,6 +723,36 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { return sm, nil } +// getLaggingRangesCallback returns a function which can be called to update the +// lagging ranges metric. It should be called with the current number of lagging +// ranges. +func (s *sliMetrics) getLaggingRangesCallback() func(int64) { + // Because this gauge is shared between changefeeds in the same metrics scope, + // we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to + // ensure values written by others are not overwritten. The code below is used + // to determine the deltas based on the last known number of lagging ranges. + // + // Example: + // + // Initially there are 0 lagging ranges, so `last` is 0. Assume the gauge + // has an arbitrary value X. + // + // If 10 ranges are behind, last=0,i=10: X.Dec(0 - 10) = X.Inc(10) + // If 3 ranges catch up, last=10,i=7: X.Dec(10 - 7) = X.Dec(3) + // If 4 ranges fall behind, last=7,i=11: X.Dec(7 - 11) = X.Inc(4) + // If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1) + last := struct { + syncutil.Mutex + v int64 + }{} + return func(i int64) { + last.Lock() + defer last.Unlock() + s.LaggingRanges.Dec(last.v - i) + last.v = i + } +} + // Metrics are for production monitoring of changefeeds. type Metrics struct { AggMetrics *AggMetrics diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index e2b10b25c980..daaab9ef5e50 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -84,9 +84,20 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { return int(l) } +// ForEachRangeFn is used to execute `fn` over each range in a rangefeed. +type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error + type rangeFeedConfig struct { useMuxRangeFeed bool overSystemTable bool + rangeObserver func(ForEachRangeFn) + + knobs struct { + // onRangefeedEvent invoked on each rangefeed event. + // Returns boolean indicating if event should be skipped or an error + // indicating if rangefeed should terminate. + onRangefeedEvent func(ctx context.Context, s roachpb.Span, event *roachpb.RangeFeedEvent) (skip bool, _ error) + } } // RangeFeedOption configures a RangeFeed. @@ -113,6 +124,14 @@ func WithSystemTablePriority() RangeFeedOption { }) } +// WithRangeObserver is called when the rangefeed starts with a function that +// can be used to iterate over all the ranges. +func WithRangeObserver(observer func(ForEachRangeFn)) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.rangeObserver = observer + }) +} + // A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) @@ -180,6 +199,9 @@ func (ds *DistSender) RangeFeedSpans( rr := newRangeFeedRegistry(ctx, withDiff) ds.activeRangeFeeds.Store(rr, nil) defer ds.activeRangeFeeds.Delete(rr) + if cfg.rangeObserver != nil { + cfg.rangeObserver(rr.ForEachPartialRangefeed) + } catchupSem := limit.MakeConcurrentRequestLimiter( "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) @@ -254,34 +276,42 @@ type PartialRangeFeed struct { // ActiveRangeFeedIterFn is an iterator function which is passed PartialRangeFeed structure. // Iterator function may return an iterutil.StopIteration sentinel error to stop iteration -// early; any other error is propagated. +// early. type ActiveRangeFeedIterFn func(rfCtx RangeFeedContext, feed PartialRangeFeed) error -// ForEachActiveRangeFeed invokes provided function for each active range feed. +const continueIter = true +const stopIter = false + +// ForEachActiveRangeFeed invokes provided function for each active rangefeed. +// iterutil.StopIteration can be returned by `fn` to stop iteration, and doing +// so will not return this error. func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr error) { - const continueIter = true - const stopIter = false + ds.activeRangeFeeds.Range(func(k, v interface{}) bool { + r := k.(*rangeFeedRegistry) + iterErr = r.ForEachPartialRangefeed(fn) + return iterErr == nil + }) + return iterutil.Map(iterErr) +} + +// ForEachPartialRangefeed invokes provided function for each partial rangefeed. Use manageIterationErrs +// if the fn uses iterutil.StopIteration to stop iteration. +func (r *rangeFeedRegistry) ForEachPartialRangefeed(fn ActiveRangeFeedIterFn) (iterErr error) { partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { active.Lock() defer active.Unlock() return active.PartialRangeFeed } - - ds.activeRangeFeeds.Range(func(k, v interface{}) bool { - r := k.(*rangeFeedRegistry) - r.ranges.Range(func(k, v interface{}) bool { - active := k.(*activeRangeFeed) - if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil { - iterErr = err - return stopIter - } - return continueIter - }) - return iterErr == nil + r.ranges.Range(func(k, v interface{}) bool { + active := k.(*activeRangeFeed) + if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil { + iterErr = err + return stopIter + } + return continueIter }) - - return iterutil.Map(iterErr) + return iterErr } // activeRangeFeed is a thread safe PartialRangeFeed. @@ -660,6 +690,16 @@ func (ds *DistSender) singleRangeFeed( return args.Timestamp, err } + if cfg.knobs.onRangefeedEvent != nil { + skip, err := cfg.knobs.onRangefeedEvent(ctx, span, event) + if err != nil { + return args.Timestamp, err + } + if skip { + continue + } + } + msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span} switch t := event.GetValue().(type) { case *roachpb.RangeFeedCheckpoint: @@ -736,5 +776,14 @@ func (ds *DistSender) handleStuckEvent( return errors.Wrapf(errRestartStuckRange, "waiting for r%d %s [threshold %s]", args.RangeID, args.Replica, threshold) } +// TestingWithOnRangefeedEvent returns a test only option to modify rangefeed event. +func TestingWithOnRangefeedEvent( + fn func(ctx context.Context, s roachpb.Span, event *roachpb.RangeFeedEvent) (skip bool, _ error), +) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.knobs.onRangefeedEvent = fn + }) +} + // sentinel error returned when cancelling rangefeed when it is stuck. var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity") diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 2ca094e3573e..f31c16b8ff6e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -12,6 +12,8 @@ package kvcoord_test import ( "context" + "fmt" + "reflect" "sync/atomic" "testing" "time" @@ -191,6 +193,7 @@ func rangeFeed( startFrom hlc.Timestamp, onValue func(event kvcoord.RangeFeedMessage), useMuxRangeFeed bool, + opts ...kvcoord.RangeFeedOption, ) func() { ds := dsI.(*kvcoord.DistSender) events := make(chan kvcoord.RangeFeedMessage) @@ -198,7 +201,6 @@ func rangeFeed( g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) (err error) { - var opts []kvcoord.RangeFeedOption if useMuxRangeFeed { opts = append(opts, kvcoord.WithMuxRangeFeed()) ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{}) @@ -584,3 +586,114 @@ func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) { // retry. require.NotZero(t, ds.Metrics().RangefeedRestartStuck.Count()) } + +// TestRangefeedRangeObserver ensures the kvcoord.WithRangeObserver option +// works correctly. +func TestRangefeedRangeObserver(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + kvserver.RangefeedEnabled.Override( + context.Background(), &ts.ClusterSettings().SV, true) + + testutils.RunTrueAndFalse(t, "mux", func(t *testing.T, useMux bool) { + sqlDB.ExecMultiple(t, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 4)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 4, 1))`, + ) + defer func() { + sqlDB.Exec(t, `DROP TABLE foo`) + }() + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + ignoreValues := func(event kvcoord.RangeFeedMessage) {} + + // Set up an observer to continuously poll for the list of ranges + // being watched. + var observedRangesMu syncutil.Mutex + observedRanges := make(map[string]struct{}) + ctx2, cancel := context.WithCancel(context.Background()) + g := ctxgroup.WithContext(ctx2) + defer func() { + cancel() + err := g.Wait() + // Ensure the observer goroutine terminates gracefully via context cancellation. + require.True(t, testutils.IsError(err, "context canceled")) + }() + observer := func(fn kvcoord.ForEachRangeFn) { + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(200 * time.Millisecond): + } + observedRangesMu.Lock() + observedRanges = make(map[string]struct{}) + err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { + observedRanges[feed.Span.String()] = struct{}{} + return nil + }) + observedRangesMu.Unlock() + if err != nil { + return err + } + } + }) + } + + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, useMux, + kvcoord.WithRangeObserver(observer)) + defer closeFeed() + + makeSpan := func(suffix string) string { + return fmt.Sprintf("/Table/%d/%s", fooDesc.GetID(), suffix) + } + + // The initial set of ranges we expect to observe. + expectedRanges := map[string]struct{}{ + makeSpan("1{-/1}"): {}, + makeSpan("1/{1-2}"): {}, + makeSpan("1/{2-3}"): {}, + makeSpan("1/{3-4}"): {}, + makeSpan("{1/4-2}"): {}, + } + checkExpectedRanges := func() { + testutils.SucceedsWithin(t, func() error { + observedRangesMu.Lock() + defer observedRangesMu.Unlock() + if !reflect.DeepEqual(observedRanges, expectedRanges) { + return errors.Newf("expected ranges %v, but got %v", expectedRanges, observedRanges) + } + return nil + }, 10*time.Second) + } + checkExpectedRanges() + + // Add another range and ensure we can observe it. + sqlDB.ExecMultiple(t, + `INSERT INTO FOO VALUES(5)`, + `ALTER TABLE foo SPLIT AT VALUES(5)`, + ) + expectedRanges = map[string]struct{}{ + makeSpan("1{-/1}"): {}, + makeSpan("1/{1-2}"): {}, + makeSpan("1/{2-3}"): {}, + makeSpan("1/{3-4}"): {}, + makeSpan("1/{4-5}"): {}, + makeSpan("{1/5-2}"): {}, + } + checkExpectedRanges() + }) +} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index f0b5fd19d892..c94ff2669a5f 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1582,6 +1582,12 @@ var charts = []sectionDescription{ "changefeed.checkpoint_progress", }, }, + { + Title: "Changefeed Lagging Ranges", + Metrics: []string{ + "changefeed.lagging_ranges", + }, + }, }, }, {