diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 7504d67dd3fb..dfa36d567e10 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -50,6 +50,8 @@ kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide c
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records tenant-ro
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval tenant-ro
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled tenant-rw
+kv.rangefeed.lagging_ranges_frequency duration 1m0s controls the frequency at which a rangefeed checks for ranges which have fallen behind tenant-rw
+kv.rangefeed.lagging_ranges_threshold duration 3m0s controls how far behind a range must be from the present to be considered as 'lagging' behind in metrics tenant-rw
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) tenant-rw
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions tenant-rw
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions tenant-rw
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index b41ceaadc906..77bbe7502939 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -71,6 +71,8 @@
kv.range_split.load_qps_threshold
| integer | 2500 | the QPS over which, the range becomes a candidate for load based splitting | Dedicated/Self-Hosted |
kv.rangefeed.closed_timestamp_refresh_interval
| duration | 3s | the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval | Serverless/Dedicated/Self-Hosted (read-only) |
kv.rangefeed.enabled
| boolean | false | if set, rangefeed registration is enabled | Serverless/Dedicated/Self-Hosted |
+kv.rangefeed.lagging_ranges_frequency
| duration | 1m0s | controls the frequency at which a rangefeed checks for ranges which have fallen behind | Serverless/Dedicated/Self-Hosted |
+kv.rangefeed.lagging_ranges_threshold
| duration | 3m0s | controls how far behind a range must be from the present to be considered as 'lagging' behind in metrics | Serverless/Dedicated/Self-Hosted |
kv.rangefeed.range_stuck_threshold
| duration | 1m0s | restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) | Serverless/Dedicated/Self-Hosted |
kv.replica_circuit_breaker.slow_replication_threshold
| duration | 1m0s | duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers) | Dedicated/Self-Hosted |
kv.replica_stats.addsst_request_size_factor
| integer | 50000 | the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 | Dedicated/Self-Hosted |
diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel
index bc7ef3f8084d..fb157b78b4a9 100644
--- a/pkg/ccl/changefeedccl/BUILD.bazel
+++ b/pkg/ccl/changefeedccl/BUILD.bazel
@@ -237,6 +237,7 @@ go_test(
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
+ "//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/roachpb",
diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go
index 60a9b05f67da..1201576c0319 100644
--- a/pkg/ccl/changefeedccl/changefeed_processors.go
+++ b/pkg/ccl/changefeedccl/changefeed_processors.go
@@ -455,6 +455,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
+ UpdateLaggingRanges: ca.sliMetrics.getLaggingRangesCallback(),
}, nil
}
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index 270d6a4bd2c7..ad2ee176ea73 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -50,6 +50,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
@@ -1226,6 +1228,124 @@ func TestChangefeedInitialScan(t *testing.T) {
cdcTest(t, testFn)
}
+// TestChangefeedLaggingRangesMetrics tests the behavior of the
+// changefeed.lagging_ranges metric.
+func TestChangefeedLaggingRangesMetrics(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
+ sqlDB := sqlutils.MakeSQLRunner(s.DB)
+
+ // Set a small threshold and frequeny so lagging ranges can be detected faster.
+ kvcoord.LaggingRangesThreshold.Override(
+ context.Background(), &s.Server.ClusterSettings().SV, 250*time.Millisecond)
+ kvcoord.LaggingRangesCheckFrequency.Override(
+ context.Background(), &s.Server.ClusterSettings().SV, 25*time.Millisecond)
+
+ // Ensure a fast closed timestamp interval so ranges can catch up fast.
+ kvserver.RangeFeedRefreshInterval.Override(
+ context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
+ closedts.SideTransportCloseInterval.Override(
+ context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
+ closedts.TargetDuration.Override(
+ context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
+
+ skipMu := syncutil.Mutex{}
+ skippedRanges := map[string]struct{}{}
+ numRanges := 10
+ numRangesToSkip := int64(4)
+ var stopSkip atomic.Bool
+ // `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees.
+ // skipping is disabled by setting `stopSkip` to true.
+ shouldSkip := func(event *kvpb.RangeFeedEvent) bool {
+ if stopSkip.Load() {
+ return false
+ }
+ switch event.GetValue().(type) {
+ case *kvpb.RangeFeedCheckpoint:
+ sp := event.Checkpoint.Span
+ skipMu.Lock()
+ defer skipMu.Unlock()
+ if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip {
+ skippedRanges[sp.String()] = struct{}{}
+ return true
+ }
+ }
+ return false
+ }
+
+ knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
+
+ knobs.FeedKnobs.RangefeedOptions = append(knobs.FeedKnobs.RangefeedOptions, kvcoord.TestingWithOnRangefeedEvent(
+ func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
+ return shouldSkip(event), nil
+ }),
+ )
+
+ registry := s.Server.JobRegistry().(*jobs.Registry)
+ sli1, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t1")
+ require.NoError(t, err)
+ laggingRangesTier1 := sli1.LaggingRanges
+ sli2, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t2")
+ require.NoError(t, err)
+ laggingRangesTier2 := sli2.LaggingRanges
+
+ assertLaggingRanges := func(tier string, expected int64) {
+ testutils.SucceedsWithin(t, func() error {
+ var laggingRangesObserved int64
+ if tier == "t1" {
+ laggingRangesObserved = laggingRangesTier1.Value()
+ } else {
+ laggingRangesObserved = laggingRangesTier2.Value()
+ }
+ if laggingRangesObserved != expected {
+ return fmt.Errorf("expected %d lagging ranges, but found %d", expected, laggingRangesObserved)
+ }
+ return nil
+ }, 10*time.Second)
+ }
+
+ sqlDB.Exec(t, fmt.Sprintf(`
+ CREATE TABLE foo (key INT PRIMARY KEY);
+ INSERT INTO foo (key) SELECT * FROM generate_series(1, %d);
+ ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d, 1));
+ `, numRanges, numRanges-1))
+ sqlDB.CheckQueryResults(t, `SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`,
+ [][]string{{fmt.Sprint(numRanges)}},
+ )
+
+ feed1Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`)
+ feed2Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`)
+ feed3Tier2 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t2"`)
+
+ assertLaggingRanges("t1", numRangesToSkip*2)
+ assertLaggingRanges("t2", numRangesToSkip)
+
+ stopSkip.Store(true)
+ assertLaggingRanges("t1", 0)
+ assertLaggingRanges("t2", 0)
+
+ stopSkip.Store(false)
+ assertLaggingRanges("t1", numRangesToSkip*2)
+ assertLaggingRanges("t2", numRangesToSkip)
+
+ require.NoError(t, feed1Tier1.Close())
+ assertLaggingRanges("t1", numRangesToSkip)
+ assertLaggingRanges("t2", numRangesToSkip)
+
+ require.NoError(t, feed2Tier1.Close())
+ assertLaggingRanges("t1", 0)
+ assertLaggingRanges("t2", numRangesToSkip)
+
+ require.NoError(t, feed3Tier2.Close())
+ assertLaggingRanges("t1", 0)
+ assertLaggingRanges("t2", 0)
+ }
+ // Can't run on tenants due to lack of SPLIT AT support (#54254)
+ cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks)
+}
+
func TestChangefeedBackfillObservability(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go
index 5147e9d8c92b..bad05a2d0687 100644
--- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go
+++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go
@@ -47,6 +47,7 @@ type Config struct {
Metrics *kvevent.Metrics
OnBackfillCallback func() func()
OnBackfillRangeCallback func(int64) (func(), func())
+ UpdateLaggingRanges func(int64)
MM *mon.BytesMonitor
WithDiff bool
SchemaChangeEvents changefeedbase.SchemaChangeEventClass
@@ -107,6 +108,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.SchemaFeed,
sc, pff, bf, cfg.UseMux, cfg.Targets, cfg.Knobs)
f.onBackfillCallback = cfg.OnBackfillCallback
+ f.updateLaggingRanges = cfg.UpdateLaggingRanges
g := ctxgroup.WithContext(ctx)
g.GoCtx(cfg.SchemaFeed.Run)
@@ -174,9 +176,10 @@ type kvFeed struct {
writer kvevent.Writer
codec keys.SQLCodec
- onBackfillCallback func() func()
- schemaChangeEvents changefeedbase.SchemaChangeEventClass
- schemaChangePolicy changefeedbase.SchemaChangePolicy
+ onBackfillCallback func() func()
+ updateLaggingRanges func(int64)
+ schemaChangeEvents changefeedbase.SchemaChangeEventClass
+ schemaChangePolicy changefeedbase.SchemaChangePolicy
useMux bool
@@ -485,11 +488,12 @@ func (f *kvFeed) runUntilTableEvent(
g := ctxgroup.WithContext(ctx)
physicalCfg := rangeFeedConfig{
- Spans: stps,
- Frontier: resumeFrontier.Frontier(),
- WithDiff: f.withDiff,
- Knobs: f.knobs,
- UseMux: f.useMux,
+ Spans: stps,
+ Frontier: resumeFrontier.Frontier(),
+ WithDiff: f.withDiff,
+ Knobs: f.knobs,
+ UseMux: f.useMux,
+ UpdateLaggingRanges: f.updateLaggingRanges,
}
// The following two synchronous calls works as follows:
diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
index d8332e189828..dfca1f425a91 100644
--- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
+++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
@@ -27,11 +27,12 @@ type physicalFeedFactory interface {
}
type rangeFeedConfig struct {
- Frontier hlc.Timestamp
- Spans []kvcoord.SpanTimePair
- WithDiff bool
- Knobs TestingKnobs
- UseMux bool
+ Frontier hlc.Timestamp
+ Spans []kvcoord.SpanTimePair
+ WithDiff bool
+ UpdateLaggingRanges func(int64)
+ Knobs TestingKnobs
+ UseMux bool
}
type rangefeedFactory func(
@@ -81,6 +82,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
if cfg.WithDiff {
rfOpts = append(rfOpts, kvcoord.WithDiff())
}
+ if cfg.UpdateLaggingRanges != nil {
+ rfOpts = append(rfOpts, kvcoord.WithLaggingRangesUpdate(cfg.UpdateLaggingRanges))
+ }
+ if len(cfg.Knobs.RangefeedOptions) != 0 {
+ rfOpts = append(rfOpts, cfg.Knobs.RangefeedOptions...)
+ }
g.GoCtx(func(ctx context.Context) error {
return p(ctx, cfg.Spans, feed.eventC, rfOpts...)
diff --git a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go
index 1982d6cdb9d0..51e10ab7fdc7 100644
--- a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go
+++ b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go
@@ -33,6 +33,9 @@ type TestingKnobs struct {
// ModifyTimestamps is called on the timestamp for each RangefeedMessage
// before converting it into a kv event.
ModifyTimestamps func(*hlc.Timestamp)
+ // RangefeedOptions which will be passed to the rangefeed which lies under
+ // the kvfeed.
+ RangefeedOptions []kvcoord.RangeFeedOption
}
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go
index cde48c442fc9..90f0e7622702 100644
--- a/pkg/ccl/changefeedccl/metrics.go
+++ b/pkg/ccl/changefeedccl/metrics.go
@@ -70,6 +70,7 @@ type AggMetrics struct {
SchemaRegistryRetries *aggmetric.AggCounter
AggregatorProgress *aggmetric.AggGauge
CheckpointProgress *aggmetric.AggGauge
+ LaggingRanges *aggmetric.AggGauge
// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
@@ -132,6 +133,7 @@ type sliMetrics struct {
SchemaRegistryRetries *aggmetric.Counter
AggregatorProgress *aggmetric.Gauge
CheckpointProgress *aggmetric.Gauge
+ LaggingRanges *aggmetric.Gauge
mu struct {
syncutil.Mutex
@@ -607,6 +609,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Unix Timestamp Nanoseconds",
Unit: metric.Unit_TIMESTAMP_NS,
}
+ metaLaggingRangePercentage := metric.Metadata{
+ Name: "changefeed.lagging_ranges",
+ Help: "The number of ranges considered to be lagging behind",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
@@ -617,6 +625,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
}
return min
}
+
// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
@@ -681,6 +690,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations),
AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn),
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
+ LaggingRanges: b.Gauge(metaLaggingRangePercentage),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
@@ -740,6 +750,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope),
SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
+ LaggingRanges: a.LaggingRanges.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
@@ -769,6 +780,31 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
return sm, nil
}
+// getLaggingRangesCallback returns a function which can be called to update the
+// lagging ranges metric. It should be called with the current number of lagging
+// ranges.
+func (s *sliMetrics) getLaggingRangesCallback() func(int64) {
+ // Because this gauge is shared between changefeeds in the same metrics scope,
+ // must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to
+ // ensure values written by others are not overwritten. The code below is used
+ // to determine the deltas based on the last known number of lagging ranges.
+ //
+ // Example:
+ //
+ // Initially there are 0 lagging ranges, so `last` is 0. Assume the gauge
+ // has an arbitrary value X.
+ //
+ // If 10 ranges are behind, last=0,i=10: X.Dec(0 - 10) = X.Inc(10)
+ // If 3 ranges catch up, last=10,i=7: X.Dec(10 - 7) = X.Dec(3)
+ // If 4 ranges fall behind, last=7,i=11: X.Dec(7 - 11) = X.Inc(4)
+ // If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1)
+ var last int64
+ return func(i int64) {
+ s.LaggingRanges.Dec(last - i)
+ last = i
+ }
+}
+
// Metrics are for production monitoring of changefeeds.
type Metrics struct {
AggMetrics *AggMetrics
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
index 4c0e2105bd59..5c48dfed2fc8 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
@@ -94,8 +94,12 @@ func muxRangeFeed(
if cfg.knobs.metrics != nil {
m.metrics = cfg.knobs.metrics
}
-
divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g)
+ if cfg.withLaggingRangesUpdate != nil {
+ m.g.GoCtx(func(ctx context.Context) error {
+ return ds.monitorLaggingRanges(ctx, rr, cfg.withLaggingRangesUpdate)
+ })
+ }
return errors.CombineErrors(m.g.Wait(), ctx.Err())
}
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
index 0dc209f7bb02..430e89a781a0 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
@@ -81,6 +81,28 @@ var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
settings.WithPublic)
+// LaggingRangesCheckFrequency is the frequency at which the rangefeed will
+// check for ranges which have fallen behind.
+var LaggingRangesCheckFrequency = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "kv.rangefeed.lagging_ranges_frequency",
+ "controls the frequency at which a rangefeed checks for ranges which have fallen behind",
+ 1*time.Minute,
+ settings.NonNegativeDuration,
+ settings.WithPublic,
+)
+
+// LaggingRangesThreshold is how far behind a range must be from the present to
+// be considered as 'lagging' behind in metrics
+var LaggingRangesThreshold = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "kv.rangefeed.lagging_ranges_threshold",
+ "controls how far behind a range must be from the present to be considered as 'lagging' behind in metrics",
+ 3*time.Minute,
+ settings.NonNegativeDuration,
+ settings.WithPublic,
+)
+
func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
@@ -90,9 +112,10 @@ func maxConcurrentCatchupScans(sv *settings.Values) int {
}
type rangeFeedConfig struct {
- useMuxRangeFeed bool
- overSystemTable bool
- withDiff bool
+ useMuxRangeFeed bool
+ overSystemTable bool
+ withDiff bool
+ withLaggingRangesUpdate func(int64)
knobs struct {
// onRangefeedEvent invoked on each rangefeed event.
@@ -139,6 +162,15 @@ func WithDiff() RangeFeedOption {
})
}
+// WithLaggingRangesUpdate registers a callback which is called periodically
+// with the number of lagging ranges. The frequency and strictness of this check
+// are determined by cluster settings in this package.
+func WithLaggingRangesUpdate(f func(int64)) RangeFeedOption {
+ return optionFunc(func(c *rangeFeedConfig) {
+ c.withLaggingRangesUpdate = f
+ })
+}
+
// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation.
var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true)
@@ -239,12 +271,55 @@ func (ds *DistSender) RangeFeedSpans(
}
})
+ if cfg.withLaggingRangesUpdate != nil {
+ g.GoCtx(func(ctx context.Context) error {
+ return ds.monitorLaggingRanges(ctx, rr, cfg.withLaggingRangesUpdate)
+ })
+ }
+
// Kick off the initial set of ranges.
divideAllSpansOnRangeBoundaries(spans, sendSingleRangeInfo(rangeCh), ds, &g)
return g.Wait()
}
+func (ds *DistSender) monitorLaggingRanges(
+ ctx context.Context, rr *rangeFeedRegistry, updateLaggingRanges func(int64),
+) error {
+ // If we are getting shut down, we should reset this metric.
+ defer func() {
+ updateLaggingRanges(0)
+ }()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(LaggingRangesCheckFrequency.Get(&ds.st.SV)):
+ count := int64(0)
+ thresholdTS := timeutil.Now().Add(-1 * LaggingRangesThreshold.Get(&ds.st.SV))
+ i := 0
+ if err := rr.ForEachPartialRangefeed(func(rfCtx RangeFeedContext, feed PartialRangeFeed) error {
+ // The resolved timestamp of a range determines the timestamp which is caught up to.
+ // However, during catchup scans, this is not set. For catchup scans,
+ // we consider the time the partial rangefeed was created to be its starting time.
+ ts := hlc.Timestamp{WallTime: feed.CreatedTime.UnixNano()}
+ if !feed.Resolved.EqOrdering(hlc.Timestamp{}) {
+ ts = feed.Resolved
+ }
+
+ i += 1
+ if ts.Less(hlc.Timestamp{WallTime: thresholdTS.UnixNano()}) {
+ count += 1
+ }
+ return nil
+ }, true); err != nil {
+ return err
+ }
+ updateLaggingRanges(count)
+ }
+ }
+}
+
// divideAllSpansOnRangeBoundaries divides all spans on range boundaries and invokes
// provided onRange function for each range. Resolution happens concurrently using provided
// context group.
@@ -303,34 +378,45 @@ type PartialRangeFeed struct {
// ActiveRangeFeedIterFn is an iterator function which is passed PartialRangeFeed structure.
// Iterator function may return an iterutil.StopIteration sentinel error to stop iteration
-// early; any other error is propagated.
+// early.
type ActiveRangeFeedIterFn func(rfCtx RangeFeedContext, feed PartialRangeFeed) error
-// ForEachActiveRangeFeed invokes provided function for each active range feed.
+const continueIter = true
+const stopIter = false
+
+// ForEachActiveRangeFeed invokes provided function for each active rangefeed.
func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr error) {
- const continueIter = true
- const stopIter = false
+ ds.activeRangeFeeds.Range(func(k, v interface{}) bool {
+ r := k.(*rangeFeedRegistry)
+ iterErr = r.ForEachPartialRangefeed(fn, false)
+ return iterErr == nil
+ })
+
+ return iterutil.Map(iterErr)
+}
+// ForEachPartialRangefeed invokes provided function for each partial rangefeed. Use manageIterationErrs
+// if the fn uses iterutil.StopIteration to stop iteration.
+func (r *rangeFeedRegistry) ForEachPartialRangefeed(
+ fn ActiveRangeFeedIterFn, manageIterationErrs bool,
+) (iterErr error) {
partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed {
active.Lock()
defer active.Unlock()
return active.PartialRangeFeed
}
-
- ds.activeRangeFeeds.Range(func(k, v interface{}) bool {
- r := k.(*rangeFeedRegistry)
- r.ranges.Range(func(k, v interface{}) bool {
- active := k.(*activeRangeFeed)
- if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil {
- iterErr = err
- return stopIter
- }
- return continueIter
- })
- return iterErr == nil
+ r.ranges.Range(func(k, v interface{}) bool {
+ active := k.(*activeRangeFeed)
+ if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil {
+ iterErr = err
+ return stopIter
+ }
+ return continueIter
})
-
- return iterutil.Map(iterErr)
+ if manageIterationErrs {
+ return iterutil.Map(iterErr)
+ }
+ return iterErr
}
// activeRangeFeed is a thread safe PartialRangeFeed.
@@ -445,10 +531,10 @@ func newActiveRangeFeed(
StartAfter: startAfter,
CreatedTime: timeutil.Now(),
},
- release: func() {
- rr.ranges.Delete(active)
- c.Dec(1)
- },
+ }
+ active.release = func() {
+ rr.ranges.Delete(active)
+ c.Dec(1)
}
rr.ranges.Store(active, nil)
c.Inc(1)
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
index 2452db21890b..18a8fdc646cf 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
@@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
@@ -805,6 +806,121 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
})
}
+// TestRangefeedLaggingRangesCallback ensures the
+// kvcoord.WithLaggingRangesUpdate callback works correctly.
+func TestRangefeedLaggingRangesCallback(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+ tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
+ defer tc.Stopper().Stop(ctx)
+
+ ts := tc.Server(0)
+ sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+
+ // Set a small threshold and frequeny so lagging ranges can be detected faster.
+ kvcoord.LaggingRangesThreshold.Override(
+ context.Background(), &ts.ClusterSettings().SV, 250*time.Millisecond)
+ kvcoord.LaggingRangesCheckFrequency.Override(
+ context.Background(), &ts.ClusterSettings().SV, 25*time.Millisecond)
+
+ // Ensure a fast closed timestamp interval so ranges can catch up fast.
+ kvserver.RangeFeedRefreshInterval.Override(
+ context.Background(), &ts.ClusterSettings().SV, 20*time.Millisecond)
+ closedts.SideTransportCloseInterval.Override(
+ context.Background(), &ts.ClusterSettings().SV, 20*time.Millisecond)
+ closedts.TargetDuration.Override(
+ context.Background(), &ts.ClusterSettings().SV, 20*time.Millisecond)
+
+ kvserver.RangefeedEnabled.Override(
+ context.Background(), &ts.ClusterSettings().SV, true)
+
+ // Create 10 ranges and plan to put 4 into a lagging state.
+ numRangesToSkip := int64(4)
+ sqlDB.ExecMultiple(t,
+ `CREATE TABLE foo (key INT PRIMARY KEY)`,
+ `INSERT INTO foo (key) SELECT * FROM generate_series(1, 10)`,
+ `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 9, 1))`,
+ )
+
+ fooDesc := desctestutils.TestingGetPublicTableDescriptor(
+ ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo")
+ fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)
+
+ testutils.RunTrueAndFalse(t, "mux", func(t *testing.T, useMux bool) {
+ ignoreValues := func(event kvcoord.RangeFeedMessage) {}
+
+ skipMu := syncutil.Mutex{}
+ skippedRanges := map[string]struct{}{}
+ var stopSkip atomic.Bool
+ // `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees.
+ // skipping is disabled by setting `stopSkip` to true.
+ shouldSkip := func(event *kvpb.RangeFeedEvent) bool {
+ if stopSkip.Load() {
+ return false
+ }
+ switch event.GetValue().(type) {
+ case *kvpb.RangeFeedCheckpoint:
+ sp := event.Checkpoint.Span
+ skipMu.Lock()
+ defer skipMu.Unlock()
+ if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip {
+ skippedRanges[sp.String()] = struct{}{}
+ return true
+ }
+ }
+ return false
+ }
+
+ var numLaggingRanges atomic.Int64
+ laggingRangesCallback := func(i int64) {
+ numLaggingRanges.Store(i)
+ }
+ // Upon shutdown, we should report zero lagging ranges.
+ defer func() {
+ require.EqualValues(t, 0, numLaggingRanges.Load())
+ }()
+
+ closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, useMux,
+ kvcoord.TestingWithOnRangefeedEvent(
+ func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
+ return shouldSkip(event), nil
+ }),
+ kvcoord.WithLaggingRangesUpdate(laggingRangesCallback))
+ defer closeFeed()
+
+ // Assert there are `numRangesToSkip` lagging ranges.
+ testutils.SucceedsWithin(t, func() error {
+ observedLaggingRanges := numLaggingRanges.Load()
+ if observedLaggingRanges != numRangesToSkip {
+ return errors.Newf("expected %d lagging ranges but found %d", numRangesToSkip, observedLaggingRanges)
+ }
+ return nil
+ }, 10*time.Second)
+
+ // Stop skipping checkpoints. Assert that all ranges catch up.
+ stopSkip.Store(true)
+ testutils.SucceedsWithin(t, func() error {
+ observedLaggingRanges := numLaggingRanges.Load()
+ if observedLaggingRanges != 0 {
+ return errors.Newf("expected %d lagging ranges but found %d", numRangesToSkip, observedLaggingRanges)
+ }
+ return nil
+ }, 10*time.Second)
+
+ // Start skipping ranges again assert there are `numRangesToSkip` lagging ranges.
+ stopSkip.Store(false)
+ testutils.SucceedsWithin(t, func() error {
+ observedLaggingRanges := numLaggingRanges.Load()
+ if observedLaggingRanges != numRangesToSkip {
+ return errors.Newf("expected %d lagging ranges but found %d", numRangesToSkip, observedLaggingRanges)
+ }
+ return nil
+ }, 10*time.Second)
+ })
+}
+
// TestMuxRangeFeedCanCloseStream verifies stream termination functionality in mux rangefeed.
func TestMuxRangeFeedCanCloseStream(t *testing.T) {
defer leaktest.AfterTest(t)()