From 558f9fdb0ad1727cf1356714092f7527ad55b5f4 Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Tue, 20 Jun 2023 11:50:16 -0400 Subject: [PATCH 1/2] changefeedccl: add `changefeed.lagging_ranges` metric This change is a backport of the commit from #109835. The original chance uses changefeed options to configure lagging ranges metrics. Because changefeed options require version gates, they cannot be backported. This change instead uses cluster settings. This change adds the `changefeed.lagging_ranges` metric which can be used to track ranges which are behind. This metric is calculated based on a new changefeed option `lagging_ranges_threshold` which is the amount of time that a range checkpoint needs to be in the past to be considered lagging. This defaults to 3 minutes. This change also adds the changefeed option `lagging_ranges_polling_interval` which is the polling rate at which a rangefeed will poll for lagging ranges and update the metric. This defaults to 1 minute. Sometimes a range may not have any checkpoints for a while because the start time may be far in the past (this causes a catchup scan during which no checkpoints are emitted). In this case, the range is considered to the lagging if the created timestamp of the rangefeed is older than `changefeed.lagging_ranges_threshold`. Note that this means that any changefeed which starts with an initial scan containing a significant amount of data will likely indicate nonzero `changefeed.lagging_ranges` until the initial scan is complete. This is intentional. Release note (ops change): A new metric `changefeed.lagging_ranges` is added to show the number of ranges which are behind in changefeeds. This metric can be used with the `metrics_label` changefeed option. A new cluser setting `changefeed.lagging_ranges_threshold` is added which is the amount of time a range needs to be behind to be considered lagging. By default this is 3 minutes. There is also a new cluster setting `changefeed.lagging_ranges_polling_interval` which controls how often the lagging ranges calculation is done. This setting defaults to polling every 1 minute. Note that polling adds latency to the metric being updated. For example, if a range falls behind by 3 minutes, the metric may not update until an additional minute afterwards. Also note that ranges undergoing an initial scan for longer than the threshold are considered to be lagging. Starting a changefeed with an initial scan on a large table will likely increment the metric for each range in the table. However, as ranges complete the initial scan, the number of ranges will decrease. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-9181 --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + .../changefeedccl/changefeed_processors.go | 66 +++++---- pkg/ccl/changefeedccl/changefeed_test.go | 118 ++++++++++++++++ .../changefeedccl/changefeedbase/options.go | 2 + .../changefeedccl/changefeedbase/settings.go | 28 ++++ pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 127 ++++++++++++++---- .../changefeedccl/kvfeed/physical_kv_feed.go | 17 ++- pkg/ccl/changefeedccl/kvfeed/testing_knobs.go | 2 + pkg/ccl/changefeedccl/metrics.go | 35 +++++ .../kvclient/kvcoord/dist_sender_rangefeed.go | 85 +++++++++--- .../kvcoord/dist_sender_rangefeed_test.go | 115 +++++++++++++++- pkg/ts/catalog/chart_catalog.go | 6 + 12 files changed, 529 insertions(+), 73 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 58bf3dd0b93a..0f9fe7bf8f49 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -202,6 +202,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 8942f8cd60ef..0bf733b2da42 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -394,30 +394,50 @@ func (ca *changeAggregator) makeKVFeedCfg( initialHighWater, &ca.metrics.SchemaFeedMetrics, config.Opts.GetCanHandle()) } + monitoringCfg, err := makeKVFeedMonitoringCfg(ca.sliMetrics, ca.flowCtx.Cfg.Settings) + if err != nil { + return kvfeed.Config{}, err + } + return kvfeed.Config{ - Writer: buf, - Settings: cfg.Settings, - DB: cfg.DB, - Codec: cfg.Codec, - Clock: cfg.DB.Clock(), - Gossip: cfg.Gossip, - Spans: spans, - CheckpointSpans: ca.spec.Checkpoint.Spans, - CheckpointTimestamp: ca.spec.Checkpoint.Timestamp, - Targets: AllTargets(ca.spec.Feed), - Metrics: &ca.metrics.KVFeedMetrics, - OnBackfillCallback: ca.sliMetrics.getBackfillCallback(), - OnBackfillRangeCallback: ca.sliMetrics.getBackfillRangeCallback(), - MM: ca.kvFeedMemMon, - InitialHighWater: initialHighWater, - EndTime: config.EndTime, - WithDiff: filters.WithDiff, - NeedsInitialScan: needsInitialScan, - SchemaChangeEvents: schemaChange.EventClass, - SchemaChangePolicy: schemaChange.Policy, - SchemaFeed: sf, - Knobs: ca.knobs.FeedKnobs, - UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV), + Writer: buf, + Settings: cfg.Settings, + DB: cfg.DB, + Codec: cfg.Codec, + Clock: cfg.DB.Clock(), + Gossip: cfg.Gossip, + Spans: spans, + CheckpointSpans: ca.spec.Checkpoint.Spans, + CheckpointTimestamp: ca.spec.Checkpoint.Timestamp, + Targets: AllTargets(ca.spec.Feed), + Metrics: &ca.metrics.KVFeedMetrics, + MM: ca.kvFeedMemMon, + InitialHighWater: initialHighWater, + EndTime: config.EndTime, + WithDiff: filters.WithDiff, + NeedsInitialScan: needsInitialScan, + SchemaChangeEvents: schemaChange.EventClass, + SchemaChangePolicy: schemaChange.Policy, + SchemaFeed: sf, + Knobs: ca.knobs.FeedKnobs, + UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV), + MonitoringCfg: monitoringCfg, + }, nil +} + +func makeKVFeedMonitoringCfg( + sliMetrics *sliMetrics, settings *cluster.Settings, +) (kvfeed.MonitoringConfig, error) { + laggingRangesThreshold := changefeedbase.LaggingRangesThreshold.Get(&settings.SV) + laggingRangesInterval := changefeedbase.LaggingRangesPollingInterval.Get(&settings.SV) + + return kvfeed.MonitoringConfig{ + LaggingRangesCallback: sliMetrics.getLaggingRangesCallback(), + LaggingRangesThreshold: laggingRangesThreshold, + LaggingRangesPollingInterval: laggingRangesInterval, + + OnBackfillCallback: sliMetrics.getBackfillCallback(), + OnBackfillRangeCallback: sliMetrics.getBackfillRangeCallback(), }, nil } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 1a08f4c6e8bd..381bb7e3c6e2 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -1024,6 +1025,123 @@ func TestChangefeedInitialScan(t *testing.T) { cdcTest(t, testFn) } +// TestChangefeedLaggingRangesMetrics tests the behavior of the +// changefeed.lagging_ranges metric. +func TestChangefeedLaggingRangesMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + // Ensure a fast closed timestamp interval so ranges can catch up fast. + kvserver.RangeFeedRefreshInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + closedts.SideTransportCloseInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + closedts.TargetDuration.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + + changefeedbase.LaggingRangesPollingInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, 25*time.Millisecond) + changefeedbase.LaggingRangesThreshold.Override( + context.Background(), &s.Server.ClusterSettings().SV, 250*time.Millisecond) + + skipMu := syncutil.Mutex{} + skippedRanges := map[string]struct{}{} + numRanges := 10 + numRangesToSkip := int64(4) + var stopSkip atomic.Bool + // `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees. + // skipping is disabled by setting `stopSkip` to true. + shouldSkip := func(event *roachpb.RangeFeedEvent) bool { + if stopSkip.Load() { + return false + } + switch event.GetValue().(type) { + case *roachpb.RangeFeedCheckpoint: + sp := event.Checkpoint.Span + skipMu.Lock() + defer skipMu.Unlock() + if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip { + skippedRanges[sp.String()] = struct{}{} + return true + } + } + return false + } + + knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + + knobs.FeedKnobs.RangefeedOptions = append(knobs.FeedKnobs.RangefeedOptions, kvcoord.TestingWithOnRangefeedEvent( + func(ctx context.Context, s roachpb.Span, event *roachpb.RangeFeedEvent) (skip bool, _ error) { + return shouldSkip(event), nil + }), + ) + + registry := s.Server.JobRegistry().(*jobs.Registry) + sli1, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t1") + require.NoError(t, err) + laggingRangesTier1 := sli1.LaggingRanges + sli2, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t2") + require.NoError(t, err) + laggingRangesTier2 := sli2.LaggingRanges + + assertLaggingRanges := func(tier string, expected int64) { + testutils.SucceedsWithin(t, func() error { + var laggingRangesObserved int64 + if tier == "t1" { + laggingRangesObserved = laggingRangesTier1.Value() + } else { + laggingRangesObserved = laggingRangesTier2.Value() + } + if laggingRangesObserved != expected { + return fmt.Errorf("expected %d lagging ranges, but found %d", expected, laggingRangesObserved) + } + return nil + }, 10*time.Second) + } + + sqlDB.Exec(t, fmt.Sprintf(` + CREATE TABLE foo (key INT PRIMARY KEY); + INSERT INTO foo (key) SELECT * FROM generate_series(1, %d); + ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d, 1)); + `, numRanges, numRanges-1)) + sqlDB.CheckQueryResults(t, `SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`, + [][]string{{fmt.Sprint(numRanges)}}, + ) + + feed1Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`) + feed2Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`) + feed3Tier2 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t2"`) + + assertLaggingRanges("t1", numRangesToSkip*2) + assertLaggingRanges("t2", numRangesToSkip) + + stopSkip.Store(true) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", 0) + + stopSkip.Store(false) + assertLaggingRanges("t1", numRangesToSkip*2) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed1Tier1.Close()) + assertLaggingRanges("t1", numRangesToSkip) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed2Tier1.Close()) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed3Tier2.Close()) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", 0) + } + // Can't run on tenants due to lack of SPLIT AT support (#54254) + cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks) +} + func TestChangefeedBackfillObservability(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 292518490acd..df12624c28ed 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -507,6 +507,8 @@ func (s StatementOptions) getEnumValue(k string) (string, error) { return rawVal, nil } +// getDurationValue validates that the option `k` was supplied with a +// valid duration. func (s StatementOptions) getDurationValue(k string) (*time.Duration, error) { v, ok := s.m[k] if !ok { diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index cb3a12a36921..382e229023ca 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -279,3 +279,31 @@ var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting( "determines whether changefeed event processing integrates with elastic CPU control", false, ) + +var defaultLaggingRangesThreshold = 3 * time.Minute + +var defaultLaggingRangesPollingInterval = 1 * time.Minute + +// LaggingRangesThreshold specifies the duration by which a range must +// be lagging behind the present to be considered as 'lagging' behind in +// metrics. +var LaggingRangesThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.lagging_ranges_threshold", + "specifies the duration by which a range must be lagging behind the "+ + "present to be considered as 'lagging' behind in metrics. will be "+ + "removed in v23.2 in favor of a changefeed option", + defaultLaggingRangesThreshold, + settings.PositiveDuration, +) + +// LaggingRangesPollingInterval is the polling rate at which lagging ranges are +// checked and metrics are updated. +var LaggingRangesPollingInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.lagging_ranges_polling_interval", + "the polling rate at which lagging ranges are checked and "+ + "corresponding metrics are updated. will be removed in v23.2 onwards", + defaultLaggingRangesPollingInterval, + settings.PositiveDuration, +) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 2b2b3ccd9e4e..cc43810b485f 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -15,6 +15,7 @@ package kvfeed import ( "context" "fmt" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" @@ -31,29 +32,46 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) -// Config configures a kvfeed. -type Config struct { - Settings *cluster.Settings - DB *kv.DB - Codec keys.SQLCodec - Clock *hlc.Clock - Gossip gossip.OptionalGossip - Spans []roachpb.Span - CheckpointSpans []roachpb.Span - CheckpointTimestamp hlc.Timestamp - Targets changefeedbase.Targets - Writer kvevent.Writer - Metrics *kvevent.Metrics +// MonitoringConfig is a set of callbacks which the kvfeed calls to provide +// the caller with information about the state of the kvfeed. +type MonitoringConfig struct { + // LaggingRangesCallback is called periodically with the number of lagging ranges + // in the kvfeed. + LaggingRangesCallback func(int64) + // LaggingRangesPollingInterval is how often the kv feed will poll for + // lagging ranges. + LaggingRangesPollingInterval time.Duration + // LaggingRangesThreshold is how far behind a range must be to be considered + // lagging. + LaggingRangesThreshold time.Duration + OnBackfillCallback func() func() OnBackfillRangeCallback func(int64) (func(), func()) - MM *mon.BytesMonitor - WithDiff bool - SchemaChangeEvents changefeedbase.SchemaChangeEventClass - SchemaChangePolicy changefeedbase.SchemaChangePolicy - SchemaFeed schemafeed.SchemaFeed +} + +// Config configures a kvfeed. +type Config struct { + Settings *cluster.Settings + DB *kv.DB + Codec keys.SQLCodec + Clock *hlc.Clock + Gossip gossip.OptionalGossip + Spans []roachpb.Span + CheckpointSpans []roachpb.Span + CheckpointTimestamp hlc.Timestamp + Targets changefeedbase.Targets + Writer kvevent.Writer + Metrics *kvevent.Metrics + MonitoringCfg MonitoringConfig + MM *mon.BytesMonitor + WithDiff bool + SchemaChangeEvents changefeedbase.SchemaChangeEventClass + SchemaChangePolicy changefeedbase.SchemaChangePolicy + SchemaFeed schemafeed.SchemaFeed // If true, the feed will begin with a dump of data at exactly the // InitialHighWater. This is a peculiar behavior. In general the @@ -87,7 +105,7 @@ func Run(ctx context.Context, cfg Config) error { settings: cfg.Settings, gossip: cfg.Gossip, db: cfg.DB, - onBackfillRangeCallback: cfg.OnBackfillRangeCallback, + onBackfillRangeCallback: cfg.MonitoringCfg.OnBackfillRangeCallback, } } var pff physicalFeedFactory @@ -101,6 +119,7 @@ func Run(ctx context.Context, cfg Config) error { return kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics) } + g := ctxgroup.WithContext(ctx) f := newKVFeed( cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp, cfg.SchemaChangeEvents, cfg.SchemaChangePolicy, @@ -109,9 +128,10 @@ func Run(ctx context.Context, cfg Config) error { cfg.Codec, cfg.SchemaFeed, sc, pff, bf, cfg.UseMux, cfg.Knobs) - f.onBackfillCallback = cfg.OnBackfillCallback + f.onBackfillCallback = cfg.MonitoringCfg.OnBackfillCallback + f.rangeObserver = startLaggingRangesObserver(g, cfg.MonitoringCfg.LaggingRangesCallback, + cfg.MonitoringCfg.LaggingRangesPollingInterval, cfg.MonitoringCfg.LaggingRangesThreshold) - g := ctxgroup.WithContext(ctx) g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) err := g.Wait() @@ -155,6 +175,59 @@ func Run(ctx context.Context, cfg Config) error { return err } +func startLaggingRangesObserver( + g ctxgroup.Group, + updateLaggingRanges func(int64), + pollingInterval time.Duration, + threshold time.Duration, +) func(fn kvcoord.ForEachRangeFn) { + return func(fn kvcoord.ForEachRangeFn) { + g.GoCtx(func(ctx context.Context) error { + // Reset metrics on shutdown. + defer func() { + updateLaggingRanges(0) + }() + + var timer timeutil.Timer + defer timer.Stop() + timer.Reset(pollingInterval) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + timer.Read = true + + count := int64(0) + thresholdTS := timeutil.Now().Add(-1 * threshold) + err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { + // The resolved timestamp of a range determines the timestamp which is caught up to. + // However, during catchup scans, this is not set. For catchup scans, we consider the + // time the partial rangefeed was created to be its resolved ts. Note that a range can + // restart due to a range split, transient error etc. In these cases you also expect + // to see a `CreatedTime` but no resolved timestamp. + ts := feed.Resolved + if ts.IsEmpty() { + ts = hlc.Timestamp{WallTime: feed.CreatedTime.UnixNano()} + } + + if ts.Less(hlc.Timestamp{WallTime: thresholdTS.UnixNano()}) { + count += 1 + } + return nil + }) + if err != nil { + return err + } + updateLaggingRanges(count) + timer.Reset(pollingInterval) + } + } + }) + } +} + // schemaChangeDetectedError is a sentinel error to indicate to Run() that the // schema change is stopping due to a schema change. This is handy to trigger // the context group to stop; the error is handled entirely in this package. @@ -178,6 +251,7 @@ type kvFeed struct { codec keys.SQLCodec onBackfillCallback func() func() + rangeObserver func(fn kvcoord.ForEachRangeFn) schemaChangeEvents changefeedbase.SchemaChangeEventClass schemaChangePolicy changefeedbase.SchemaChangePolicy @@ -473,11 +547,12 @@ func (f *kvFeed) runUntilTableEvent( g := ctxgroup.WithContext(ctx) physicalCfg := rangeFeedConfig{ - Spans: stps, - Frontier: resumeFrontier.Frontier(), - WithDiff: f.withDiff, - Knobs: f.knobs, - UseMux: f.useMux, + Spans: stps, + Frontier: resumeFrontier.Frontier(), + WithDiff: f.withDiff, + Knobs: f.knobs, + UseMux: f.useMux, + RangeObserver: f.rangeObserver, } g.GoCtx(func(ctx context.Context) error { diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 8ed3f8feceb1..4a3e11c5628c 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -27,11 +27,12 @@ type physicalFeedFactory interface { } type rangeFeedConfig struct { - Frontier hlc.Timestamp - Spans []kvcoord.SpanTimePair - WithDiff bool - Knobs TestingKnobs - UseMux bool + Frontier hlc.Timestamp + Spans []kvcoord.SpanTimePair + WithDiff bool + RangeObserver func(fn kvcoord.ForEachRangeFn) + Knobs TestingKnobs + UseMux bool } type rangefeedFactory func( @@ -79,6 +80,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang if cfg.UseMux { rfOpts = append(rfOpts, kvcoord.WithMuxRangeFeed()) } + if cfg.RangeObserver != nil { + rfOpts = append(rfOpts, kvcoord.WithRangeObserver(cfg.RangeObserver)) + } + if len(cfg.Knobs.RangefeedOptions) != 0 { + rfOpts = append(rfOpts, cfg.Knobs.RangefeedOptions...) + } g.GoCtx(func(ctx context.Context) error { return p(ctx, cfg.Spans, cfg.WithDiff, feed.eventC, rfOpts...) diff --git a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go index dd87b86979f7..f9f6257532a5 100644 --- a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go +++ b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go @@ -29,6 +29,8 @@ type TestingKnobs struct { // EndTimeReached is a callback that may return true to indicate the // feed should exit because its end time has been reached. EndTimeReached func() bool + // RangefeedOptions lets the kvfeed override rangefeed settings. + RangefeedOptions []kvcoord.RangeFeedOption } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 8401bd51e114..fd58de006791 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -66,6 +66,7 @@ type AggMetrics struct { SchemaRegistryRetries *aggmetric.AggCounter AggregatorProgress *aggmetric.AggGauge CheckpointProgress *aggmetric.AggGauge + LaggingRanges *aggmetric.AggGauge // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { @@ -123,6 +124,7 @@ type sliMetrics struct { SchemaRegistryRetries *aggmetric.Counter AggregatorProgress *aggmetric.Gauge CheckpointProgress *aggmetric.Gauge + LaggingRanges *aggmetric.Gauge mu struct { syncutil.Mutex @@ -563,6 +565,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Unix Timestamp Nanoseconds", Unit: metric.Unit_TIMESTAMP_NS, } + metaLaggingRangePercentage := metric.Metadata{ + Name: "changefeed.lagging_ranges", + Help: "The number of ranges considered to be lagging behind", + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -628,6 +636,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations), AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn), CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn), + LaggingRanges: b.Gauge(metaLaggingRangePercentage), } a.mu.sliMetrics = make(map[string]*sliMetrics) _, err := a.getOrCreateScope(defaultSLIScope) @@ -684,6 +693,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope), SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope), SchemaRegistrations: a.SchemaRegistrations.AddChild(scope), + LaggingRanges: a.LaggingRanges.AddChild(scope), } sm.mu.resolved = make(map[int64]hlc.Timestamp) sm.mu.checkpoint = make(map[int64]hlc.Timestamp) @@ -713,6 +723,31 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { return sm, nil } +// getLaggingRangesCallback returns a function which can be called to update the +// lagging ranges metric. It should be called with the current number of lagging +// ranges. +func (s *sliMetrics) getLaggingRangesCallback() func(int64) { + // Because this gauge is shared between changefeeds in the same metrics scope, + // we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to + // ensure values written by others are not overwritten. The code below is used + // to determine the deltas based on the last known number of lagging ranges. + // + // Example: + // + // Initially there are 0 lagging ranges, so `last` is 0. Assume the gauge + // has an arbitrary value X. + // + // If 10 ranges are behind, last=0,i=10: X.Dec(0 - 10) = X.Inc(10) + // If 3 ranges catch up, last=10,i=7: X.Dec(10 - 7) = X.Dec(3) + // If 4 ranges fall behind, last=7,i=11: X.Dec(7 - 11) = X.Inc(4) + // If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1) + var last int64 + return func(i int64) { + s.LaggingRanges.Dec(last - i) + last = i + } +} + // Metrics are for production monitoring of changefeeds. type Metrics struct { AggMetrics *AggMetrics diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index e2b10b25c980..daaab9ef5e50 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -84,9 +84,20 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { return int(l) } +// ForEachRangeFn is used to execute `fn` over each range in a rangefeed. +type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error + type rangeFeedConfig struct { useMuxRangeFeed bool overSystemTable bool + rangeObserver func(ForEachRangeFn) + + knobs struct { + // onRangefeedEvent invoked on each rangefeed event. + // Returns boolean indicating if event should be skipped or an error + // indicating if rangefeed should terminate. + onRangefeedEvent func(ctx context.Context, s roachpb.Span, event *roachpb.RangeFeedEvent) (skip bool, _ error) + } } // RangeFeedOption configures a RangeFeed. @@ -113,6 +124,14 @@ func WithSystemTablePriority() RangeFeedOption { }) } +// WithRangeObserver is called when the rangefeed starts with a function that +// can be used to iterate over all the ranges. +func WithRangeObserver(observer func(ForEachRangeFn)) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.rangeObserver = observer + }) +} + // A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) @@ -180,6 +199,9 @@ func (ds *DistSender) RangeFeedSpans( rr := newRangeFeedRegistry(ctx, withDiff) ds.activeRangeFeeds.Store(rr, nil) defer ds.activeRangeFeeds.Delete(rr) + if cfg.rangeObserver != nil { + cfg.rangeObserver(rr.ForEachPartialRangefeed) + } catchupSem := limit.MakeConcurrentRequestLimiter( "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) @@ -254,34 +276,42 @@ type PartialRangeFeed struct { // ActiveRangeFeedIterFn is an iterator function which is passed PartialRangeFeed structure. // Iterator function may return an iterutil.StopIteration sentinel error to stop iteration -// early; any other error is propagated. +// early. type ActiveRangeFeedIterFn func(rfCtx RangeFeedContext, feed PartialRangeFeed) error -// ForEachActiveRangeFeed invokes provided function for each active range feed. +const continueIter = true +const stopIter = false + +// ForEachActiveRangeFeed invokes provided function for each active rangefeed. +// iterutil.StopIteration can be returned by `fn` to stop iteration, and doing +// so will not return this error. func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr error) { - const continueIter = true - const stopIter = false + ds.activeRangeFeeds.Range(func(k, v interface{}) bool { + r := k.(*rangeFeedRegistry) + iterErr = r.ForEachPartialRangefeed(fn) + return iterErr == nil + }) + return iterutil.Map(iterErr) +} + +// ForEachPartialRangefeed invokes provided function for each partial rangefeed. Use manageIterationErrs +// if the fn uses iterutil.StopIteration to stop iteration. +func (r *rangeFeedRegistry) ForEachPartialRangefeed(fn ActiveRangeFeedIterFn) (iterErr error) { partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { active.Lock() defer active.Unlock() return active.PartialRangeFeed } - - ds.activeRangeFeeds.Range(func(k, v interface{}) bool { - r := k.(*rangeFeedRegistry) - r.ranges.Range(func(k, v interface{}) bool { - active := k.(*activeRangeFeed) - if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil { - iterErr = err - return stopIter - } - return continueIter - }) - return iterErr == nil + r.ranges.Range(func(k, v interface{}) bool { + active := k.(*activeRangeFeed) + if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil { + iterErr = err + return stopIter + } + return continueIter }) - - return iterutil.Map(iterErr) + return iterErr } // activeRangeFeed is a thread safe PartialRangeFeed. @@ -660,6 +690,16 @@ func (ds *DistSender) singleRangeFeed( return args.Timestamp, err } + if cfg.knobs.onRangefeedEvent != nil { + skip, err := cfg.knobs.onRangefeedEvent(ctx, span, event) + if err != nil { + return args.Timestamp, err + } + if skip { + continue + } + } + msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span} switch t := event.GetValue().(type) { case *roachpb.RangeFeedCheckpoint: @@ -736,5 +776,14 @@ func (ds *DistSender) handleStuckEvent( return errors.Wrapf(errRestartStuckRange, "waiting for r%d %s [threshold %s]", args.RangeID, args.Replica, threshold) } +// TestingWithOnRangefeedEvent returns a test only option to modify rangefeed event. +func TestingWithOnRangefeedEvent( + fn func(ctx context.Context, s roachpb.Span, event *roachpb.RangeFeedEvent) (skip bool, _ error), +) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.knobs.onRangefeedEvent = fn + }) +} + // sentinel error returned when cancelling rangefeed when it is stuck. var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity") diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 2ca094e3573e..f31c16b8ff6e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -12,6 +12,8 @@ package kvcoord_test import ( "context" + "fmt" + "reflect" "sync/atomic" "testing" "time" @@ -191,6 +193,7 @@ func rangeFeed( startFrom hlc.Timestamp, onValue func(event kvcoord.RangeFeedMessage), useMuxRangeFeed bool, + opts ...kvcoord.RangeFeedOption, ) func() { ds := dsI.(*kvcoord.DistSender) events := make(chan kvcoord.RangeFeedMessage) @@ -198,7 +201,6 @@ func rangeFeed( g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) (err error) { - var opts []kvcoord.RangeFeedOption if useMuxRangeFeed { opts = append(opts, kvcoord.WithMuxRangeFeed()) ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{}) @@ -584,3 +586,114 @@ func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) { // retry. require.NotZero(t, ds.Metrics().RangefeedRestartStuck.Count()) } + +// TestRangefeedRangeObserver ensures the kvcoord.WithRangeObserver option +// works correctly. +func TestRangefeedRangeObserver(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + kvserver.RangefeedEnabled.Override( + context.Background(), &ts.ClusterSettings().SV, true) + + testutils.RunTrueAndFalse(t, "mux", func(t *testing.T, useMux bool) { + sqlDB.ExecMultiple(t, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 4)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 4, 1))`, + ) + defer func() { + sqlDB.Exec(t, `DROP TABLE foo`) + }() + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + ignoreValues := func(event kvcoord.RangeFeedMessage) {} + + // Set up an observer to continuously poll for the list of ranges + // being watched. + var observedRangesMu syncutil.Mutex + observedRanges := make(map[string]struct{}) + ctx2, cancel := context.WithCancel(context.Background()) + g := ctxgroup.WithContext(ctx2) + defer func() { + cancel() + err := g.Wait() + // Ensure the observer goroutine terminates gracefully via context cancellation. + require.True(t, testutils.IsError(err, "context canceled")) + }() + observer := func(fn kvcoord.ForEachRangeFn) { + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(200 * time.Millisecond): + } + observedRangesMu.Lock() + observedRanges = make(map[string]struct{}) + err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { + observedRanges[feed.Span.String()] = struct{}{} + return nil + }) + observedRangesMu.Unlock() + if err != nil { + return err + } + } + }) + } + + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, useMux, + kvcoord.WithRangeObserver(observer)) + defer closeFeed() + + makeSpan := func(suffix string) string { + return fmt.Sprintf("/Table/%d/%s", fooDesc.GetID(), suffix) + } + + // The initial set of ranges we expect to observe. + expectedRanges := map[string]struct{}{ + makeSpan("1{-/1}"): {}, + makeSpan("1/{1-2}"): {}, + makeSpan("1/{2-3}"): {}, + makeSpan("1/{3-4}"): {}, + makeSpan("{1/4-2}"): {}, + } + checkExpectedRanges := func() { + testutils.SucceedsWithin(t, func() error { + observedRangesMu.Lock() + defer observedRangesMu.Unlock() + if !reflect.DeepEqual(observedRanges, expectedRanges) { + return errors.Newf("expected ranges %v, but got %v", expectedRanges, observedRanges) + } + return nil + }, 10*time.Second) + } + checkExpectedRanges() + + // Add another range and ensure we can observe it. + sqlDB.ExecMultiple(t, + `INSERT INTO FOO VALUES(5)`, + `ALTER TABLE foo SPLIT AT VALUES(5)`, + ) + expectedRanges = map[string]struct{}{ + makeSpan("1{-/1}"): {}, + makeSpan("1/{1-2}"): {}, + makeSpan("1/{2-3}"): {}, + makeSpan("1/{3-4}"): {}, + makeSpan("1/{4-5}"): {}, + makeSpan("{1/5-2}"): {}, + } + checkExpectedRanges() + }) +} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index f0b5fd19d892..c94ff2669a5f 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1582,6 +1582,12 @@ var charts = []sectionDescription{ "changefeed.checkpoint_progress", }, }, + { + Title: "Changefeed Lagging Ranges", + Metrics: []string{ + "changefeed.lagging_ranges", + }, + }, }, }, { From 41f05f7afe206bdbd3b5ea2ae7ba56008e61d077 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 8 Sep 2023 10:06:36 -0400 Subject: [PATCH 2/2] changefeedccl: Fix data race in lagging spans metric Fix a race bug in lagging spans metric. Fixes: #110235 Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 2 ++ pkg/ccl/changefeedccl/metrics.go | 11 ++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 381bb7e3c6e2..cafa223c150c 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1359,6 +1359,8 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) { func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index fd58de006791..cec7595cf8c6 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -741,10 +741,15 @@ func (s *sliMetrics) getLaggingRangesCallback() func(int64) { // If 3 ranges catch up, last=10,i=7: X.Dec(10 - 7) = X.Dec(3) // If 4 ranges fall behind, last=7,i=11: X.Dec(7 - 11) = X.Inc(4) // If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1) - var last int64 + last := struct { + syncutil.Mutex + v int64 + }{} return func(i int64) { - s.LaggingRanges.Dec(last - i) - last = i + last.Lock() + defer last.Unlock() + s.LaggingRanges.Dec(last.v - i) + last.v = i } }